# Load Source Data

In [None]:
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, monotonically_increasing_id, to_timestamp
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format, lit
from pyspark.sql import types
import pyspark.sql.functions as F

In [None]:
#store source data filepaths

bitstamp_path = "s3n://udacity-dend-crypto/bitcoin-historical-data/bitstamp.csv"
coinbase_path = "s3n://udacity-dend-crypto/bitcoin-historical-data/coinbase.json"

In [None]:
#load data into Spark dataframes 

bitData = spark.read.csv(bitstamp_path, header = True)
coinData = spark.read.json(coinbase_path)
bitData.persist()
coinData.persist()

# Initial Data Exploration & Cleaning

In [None]:
bitData.printSchema()

In [None]:
coinData.printSchema()

All columns are strings when they should not be, this will need to be corrected (Timestamp should be timestamp type, all others should be double type)

In [None]:
#replace 'NaN' values with 0

coinData = coinData.replace('NaN','0')
bitData = bitData.replace('NaN', '0')

In [None]:
#convert all none-Timestamp columns to doubles

for col in coinData.columns:
    if col != 'Timestamp':
        coinData = coinData.withColumn(col, coinData[col].cast(types.DoubleType()))
        bitData = bitData.withColumn(col, bitData[col].cast(types.DoubleType()))

coinData = coinData.withColumn("Timestamp", coinData["Timestamp"].cast(types.IntegerType()))
bitData = bitData.withColumn("Timestamp", bitData["Timestamp"].cast(types.IntegerType()))
coinData = coinData.withColumn("Timestamp", coinData["Timestamp"].cast(types.TimestampType()))
bitData = bitData.withColumn("Timestamp", bitData["Timestamp"].cast(types.TimestampType()))

In [None]:
#check to see if everything worked

coinData.printSchema()
bitData.printSchema()

In [None]:
coinData.show()

In [None]:
bitData.show()

Everything looks good, we can begin pipelining our data into the analytics tables

# Data Pipeline into Analytics tables

In [None]:
# Creating complete time table 

time = coinData.select('Timestamp').union(bitData.select('Timestamp')).distinct().sort("Timestamp", ascending = True)
time = time.withColumn("Date", time["Timestamp"].cast(types.DateType()))
time = time.withColumn("Month", month(time["Timestamp"]))
time = time.withColumn("DayOfMonth", dayofmonth(time["Timestamp"]))
time = time.withColumn("Year", year(time["Timestamp"]))
time = time.withColumn("Week", weekofyear(time["Timestamp"]))
time = time.withColumn("DayOfWeek", F.dayofweek(time["Timestamp"]))
time = time.withColumn("Hour", hour(time["Timestamp"]))
time = time.withColumn("Minute", F.minute(time["Timestamp"]))

In [None]:
#write time table to parquet files

time.write.partitionBy("Year","Month","DayOfMonth").parquet('s3n://udacity-dend-crypto/analytics/' + 'time')

In [None]:
#Creating Markets table

markets = sc.parallelize([{"Market": "Coinbase","Id": 1}, {"Market": "Bitstamp", "Id":2}]).toDF()
markets = markets.select("Id", "Market")


In [None]:
#write markets table to parquet files

#markets.write.parquet('s3n://udacity-dend-crypto/analytics/' + 'markets')

In [None]:
#Creating the transactions table


#append appropriate market id to each source table
coinData = coinData.withColumn("marketId", lit(1))
bitData = bitData.withColumn("marketId", lit(2))


In [None]:
#colnames with _() cause problems, need to recreate with appropriate names
coinData = coinData.withColumn("VolBTC", coinData["Volume_(BTC)"])
coinData = coinData.withColumn("VolCurrency", coinData["Volume_(Currency)"])
bitData = bitData.withColumn("VolBTC", bitData["Volume_(BTC)"])
bitData = bitData.withColumn("VolCurrency", bitData["Volume_(Currency)"])

In [None]:
#reorder and rename columns of our two source tables to allow union

bitData = bitData.selectExpr("Timestamp",
                   "marketID",
                   "Close",
                   "High",
                   "Low",
                   "Open",
                   "VolBTC",
                   "VolCurrency",
                   "Weighted_Price as WeightedPrice")

coinData = coinData.selectExpr("Timestamp",
                   "marketID",
                   "Close",
                   "High",
                   "Low",
                   "Open",
                   "VolBTC",
                   "VolCurrency",
                   "Weighted_Price as WeightedPrice")


In [None]:
#union our two source tables

transactions = coinData.unionAll(bitData)

In [None]:
#sort our table by timestamp

transactions = transactions.sort("Timestamp", ascending = True)

In [None]:
#create transaction id column

transactions = transactions.withColumn("Id", monotonically_increasing_id())

In [None]:
#write transactions to parquet files
#transactions.write.partitionBy('marketID').parquet('s3n://udacity-dend-crypto/analytics/' + 'transactions')

# Data Quality Checks

Now that our data has been loaded into the analytics schema parquet files, let's make sure everything loaded correctly by comparing the source files to the newly loaded parquet files

In [None]:
#re-read data from source

bitData = spark.read.csv(bitstamp_path, header = True)
coinData = spark.read.json(coinbase_path)
bitData.persist()
coinData.persist()

In [None]:
#read in analytics tables

time_path = "s3n://udacity-dend-crypto/analytics/time"
market_path = "s3n://udacity-dend-crypto/analytics/markets"
transaction_path = "s3n://udacity-dend-crypto/analytics/transactions"

transactions = spark.read.parquet(transaction_path)
markets = spark.read.parquet(market_path)
time_path = spark.read.parquet(time_path)

In [None]:
#compare total source file columns with transaction file columns (should be equal)

print(coinData.count() + bitData.count() == transactions.count())

In [None]:
#check to see if time table contains correct number of timestamps

sourceTimes = coinData.select('Timestamp').union(bitData.select('Timestamp')).distinct()


In [None]:
print(sourceTimes.count() == time.count())

Awesome, looks like our data was transformed and loaded in correctly! 

# Analytics

Now let's put ourselves in the shoes of the analyst and see whether we can answer some analytical questions using the analytics schema. Say we are interested specifically in the year 2018:

1. Can we get a month by month view comparing Bitstamp vs Coinbase Volume totals? 
2. Can we see which platform had higher volume for the whole of 2018?

In [None]:
transactions.createOrReplaceTempView("transactions")
markets.createOrReplaceTempView("markets")
time.createOrReplaceTempView("time")

In [None]:
Coin2018 = spark.sql('''
SELECT ts.Year, ts.Month, sum(tr.VolBTC) as totalCoinbaseBTC
FROM transactions tr JOIN time ts on tr.Timestamp = ts.Timestamp JOIN markets m on m.Id = tr.marketId
WHERE ts.Year = 2018 and m.Market = "Coinbase"
GROUP BY ts.Year, ts.Month
''')

Bit2018 = spark.sql('''
SELECT ts.Year, ts.Month, sum(tr.VolBTC) as totalBitstampBTC
FROM transactions tr JOIN time ts on tr.Timestamp = ts.Timestamp JOIN markets m on m.Id = tr.marketId
WHERE ts.Year = 2018 and m.Market = "Bitstamp"
GROUP BY ts.Year, ts.Month
''')

entire2018 = Coin2018.join(Bit2018,
                           Bit2018.Month == Coin2018.Month).select(Bit2018.Year,
                                                                   Coin2018.Month,
                                                                   "totalCoinbaseBTC",
                                                                   "totalBitstampBTC").sort("Month", ascending = True).persist()

In [None]:
entire2018.show()

Awesome, we have a monthly view of coinbase and bitstamp totals! Keep in mind this was put together by to the minute annual data consisting of more than 6 million rows! 

In [None]:
entire2018.groupBy("Year").sum("TotalCOinbaseBTC", "totalBitstampBTC").show()

We were also able to quickly aggregate the monthly information into yearly totals! Looks like Coinbase wins! 