In [1]:
# Pyspark libraries
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer

from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

import numpy as np
import pandas as pd

# Data Visualization Libraries
import seaborn as sns
import matplotlib.pyplot as plt

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)

from matplotlib import rcParams
sns.set(context='notebook', style='darkgrid', rc={'figure.figsize': (20,6)})
rcParams['figure.figsize'] = 18,4

%matplotlib inline
%config InlineBackend.figure_format = 'retina'

In [2]:
# Spark Session 
sc = SparkContext(appName = "Bitcoin Trend Analysis")
spark = SparkSession.Builder().getOrCreate()

# SQL Context
sqlContext = SQLContext(sc)

In [3]:
# Load csv file to spark dataframe
df= sqlContext.read.csv('D:/Projects/BitCoin-Trend Analysis/DataSets/bitstampUSD_1-min_data_2012-01-01_to_2020-04-22.csv', header='true', inferSchema='true')

In [None]:
df.describe().show()

In [None]:
df.printSchema()

In [None]:
df.registerTempTable("bitCoinData")

In [None]:
# Change 
bitCoinData = sqlContext.sql("select *,from_unixtime(Timestamp) as `dateTime` from bitCoinData")

In [None]:
# Remove all Null
bitCoinData = bitCoinData.na.drop()

In [None]:
bitCoinData.count()

In [None]:
bitCoinData.printSchema()

In [None]:
bitCoinData = bitCoinData.withColumnRenamed("Volume_(BTC)", "VolBTC").withColumnRenamed("Volume_(Currency)", "VolCurrency")

In [None]:
dateTimeDF = bitCoinData

In [None]:
# Create two new columns for date and time by splitting Time dateTime column
splitDF = dateTimeDF.withColumn("date",split(col("dateTime")," ").getItem(0)).withColumn("time",split(col("dateTime")," ").getItem(1))

# Set onother column for Hour 
splitDF = splitDF.withColumn("hour",split(col("time"),":").getItem(0))

In [None]:
splitDF.printSchema()

In [None]:
splitDF.limit(10).show()

In [None]:
#Add new Column for get the day of the week by the date
splitDF= splitDF.withColumn("date",splitDF["date"].cast(DateType())).withColumn("hour",splitDF["hour"].cast(DoubleType())).withColumn("dateTime",splitDF["dateTime"].cast(DateType()))
splitDF=splitDF.withColumn('day_of_week',dayofweek(splitDF.date))

#SPLit Year
splitDF = splitDF.withColumn("year",split(col("date"),"-").getItem(0))

In [None]:
splitDF.limit(10).show()

In [None]:
# Covert spark dataframe to pandas dataframe
pandasDF = splitDF.toPandas()

In [None]:
# Visualize Correaltions 

correlationMatrix=pandasDF.corr()
f,ax = plt.subplots(figsize=(15, 15))
sns.heatmap(correlationMatrix,annot=True,linewidths=.5, fmt= '.1f',ax=ax)

In [None]:
# Scatter Plot
pandasDF.plot(kind='scatter', x='VolBTC', y='VolCurrency',alpha = 0.5)
plt.xlabel('BITCOIN Volume')            
plt.ylabel('Currency Volume')
plt.title('BITCOIN AND Currency Scatter Plot') 
plt.show()

In [None]:
plt.figure(figsize=(20,8))
pandasDF.Open.plot(kind='line', color='b', label='Open', alpha=0.5, linewidth=5, grid=True, linestyle=':')
pandasDF.High.plot(color='r', label='High', linewidth=1, alpha=0.5, grid=True, linestyle='-.')
plt.legend(loc='upper right') 
plt.xlabel('Time')
plt.ylabel('price at the start of the time window')
plt.title('Distribution of High and Low Price')
plt.show()

In [None]:
#plot of Open price
pandasDF.Open.plot(kind='hist', bins=50)

In [None]:
#plot of Close price
pandasDF.Close.plot(kind='hist', bins=50)

In [None]:
#Extract values for lists
hour=pandasDF["hour"].values.tolist()
weighted_price=pandasDF["Weighted_Price"].values.tolist()
volume_BTC=pandasDF["VolBTC"].values.tolist()
date_of_week=pandasDF["day_of_week"].values.tolist()
year=pandasDF["year"].values.tolist()

In [None]:
#Plot of Weighted_price per hour 
plt.plot(hour,weighted_price , 'yo')
plt.xlabel('hour')            
plt.ylabel('Weighted_Price')
plt.title('Price by Hour') 
plt.show()

In [None]:
# Weighted_price per week 
plt.plot(date_of_week, weighted_price, 'ro')
plt.xlabel('day_of_week')            
plt.ylabel('Weighted_Price')
plt.title('Price By Week') 
plt.show()

In [None]:
# VolBTC per hour change 
plt.plot(hour, volume_BTC, 'b*')
plt.xlabel('hour')            
plt.ylabel('VolBTC')
plt.title('Volume by Hour of day') 
plt.show()

In [None]:
# VolBTC per day_of_Week 
plt.plot(date_of_week,volume_BTC , 'c*')
plt.xlabel('day_of_week')            
plt.ylabel('VolBTC')
plt.title('Volume By Week') 
plt.show()

In [None]:
#Volume_BTC per year change 
plt.plot(year,volume_BTC , 'kD')
plt.xlabel('year')            
plt.ylabel('volume_BTC')
plt.title('volume_BTC plotted in yearly basis') 
plt.show()

In [None]:
#Weighted Price per year change 
plt.plot(year,weighted_price , 'mD')
plt.xlabel('year')            
plt.ylabel('Weighted_Price')
plt.title('Weighted Price in yearly basis') 
plt.show()