####Master thesis
Methods for economic big data analysis
@Adam Wiszniewski

####0. Test Spark Cluster

In [3]:
1+1

In [4]:
import datetime
print "This was last run on: %s" % datetime.datetime.now()

In [5]:
sc

In [6]:
words = sc.parallelize(["hello", "world", "goodbye", "hello", "again"])
wordcounts = words.map(lambda s: (s, 1)).reduceByKey(lambda a, b : a + b).collect()
wordcounts

####1. ETL - Data loading to cluster
In this step we are loading data to objects and preprocess it.

In [8]:
#function to load Stock data to DataFrame and apply schema on it to enable computation
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql import SQLContext
from pyspark.sql.types import *


def loadStockDatasetToDF(datasetPath):

  #prepare a basic schema for stock data
    customSchema = StructType([ \
    StructField("Date", StringType(), True), \
    StructField("Open", FloatType(), True), \
    StructField("High", FloatType(), True), \
    StructField("Low", FloatType(), True), \
    StructField("Close", FloatType(), True), \
    StructField("Volume", FloatType(), True)])
  
  #load data from csv file
    df = sqlContext.read.format("csv").load(datasetPath,
                    format='com.databricks.spark.csv', 
                    header='true', 
                    schema=customSchema)
  #apply a proper format using udf function
    func =  udf (lambda x: datetime.strptime(x, '%d-%b-%y'), DateType())
    df = df.withColumn('DateNew', func(col('Date'))).drop('Date')

    return df


In [9]:
#function to load Forex data to DataFrame and apply schema on it to enable computation
from datetime import datetime
from pyspark.sql.functions import col,udf, unix_timestamp
from pyspark.sql import SQLContext
from pyspark.sql.types import *


def loadForexDatasetToDF(datasetPath):

  #prepare a basic schema for stock data
    customSchema = StructType([ \
    StructField("Date", StringType(), True), \
    StructField("Close", FloatType(), True), \
    StructField("Open", FloatType(), True), \
    StructField("High", FloatType(), True), \
    StructField("Low", FloatType(), True), \
    StructField("Change", StringType(), True)])
  
  #load data from csv file
    df = sqlContext.read.format("csv").load(datasetPath,
                    format='com.databricks.spark.csv', 
                    header='true', 
                    schema=customSchema)
  #apply a proper format using udf function
    func =  udf (lambda x: datetime.strptime(x, '%d-%b-%y'), DateType())
    df = df.withColumn('DateNew', func(col('Date'))).drop('Date')

    return df

In [10]:
from pyspark.sql.functions import format_number

#Function preprocess Forex data and returns dataframe with Date, Open, Close and DailyChange
def forexPreprocess(ForexDataFrame):
  df = ForexDataFrame.select(ForexDataFrame.DateNew,ForexDataFrame.Open.alias("ForexOpen"),ForexDataFrame.Close.alias("ForexClose"))
  df = df.withColumn('ForexChange', ((df.ForexClose / df.ForexOpen)*100-100)
                .cast("float")).select('DateNew','ForexOpen','ForexClose',format_number('ForexChange',2).alias('ForexDailyChangePrc').cast("float"))
  return df

#Function preprocess Stock data and returns dataframe with Date, Open, Close and DailyChange
def stockPreprocess(StockDataFrame):
  df = StockDataFrame.select(StockDataFrame.DateNew,StockDataFrame.Volume.alias('StockVolume'), StockDataFrame.Open.alias("StockOpen"),StockDataFrame.Close.alias("StockClose"))
  df = df.withColumn('StockChange', ((df.StockClose / df.StockOpen)*100-100)
                .cast("float")).select('DateNew','StockOpen','StockClose',format_number('StockChange',2).alias('StockDailyChangePrc').cast("float"), 'StockVolume')
  return df


In [11]:
def joinTwoDataFrames(df1,df2):
  return df1.join(df2,'DateNew','inner').sort('DateNew')

In [12]:
from pyspark.sql.functions import lag, col
from pyspark.sql.window import Window

def calculateChangeOvernight(joinedDF):
  wSpec = Window.partitionBy().orderBy(col("DateNew"))
  #dziala, dodaj to do jednej ramki i zrob działanie pokazujące % zmianę dnia poprzedniego
  df = joinedDF.withColumn('PrevDayStockClose',lag(joinedDF['StockClose']).over(wSpec)).withColumn('PrevDayForexClose',lag(joinedDF['ForexClose']).over(wSpec))
  df = df.withColumn('StockCloseToOpenChangePrc',((df.StockOpen / df.PrevDayStockClose)*100-100).cast("double")).withColumn('ForexCloseToOpenChangePrc',((df.ForexOpen / df.PrevDayForexClose)*100-100).cast("float"))
  df = df.drop('PrevDayStockClose').drop('PrevDayForexClose')
  df = df.withColumn('ForexDailyUp',df.ForexDailyChangePrc>0).withColumn('ForexOvernightUp',(df.ForexCloseToOpenChangePrc>0).cast('integer')).withColumn('StockOvernightUp',(df.StockCloseToOpenChangePrc>0).cast('integer'))
  df = df.select('DateNew','StockVolume','StockOpen','StockClose','StockDailyChangePrc',format_number('StockCloseToOpenChangePrc',2).alias('StockCloseToOpenChangePrc').cast("double"),'ForexOpen','ForexClose','ForexDailyChangePrc',format_number('ForexCloseToOpenChangePrc',2).alias('ForexCloseToOpenChangePrc').cast("float"),'ForexOvernightUp','StockOvernightUp' )
  return df

Load data to the lists objects to easy iterate on them

In [14]:
#Populate data location
currencyListLocation = ['/FileStore/tables/suarsxcm1478429136036/USDCHF_DAILY.csv'];

swissCompListLocation = [
'/FileStore/tables/d1xhfgxz1480700568620/swiss_abb.csv'
,'/FileStore/tables/suarsxcm1478429136036/swiss_cs.csv'
,'/FileStore/tables/suarsxcm1478429136036/swiss_nvs.csv'
,'/FileStore/tables/suarsxcm1478429136036/swiss_syt.csv'
,'/FileStore/tables/suarsxcm1478429136036/swiss_ubs.csv'];

nonSwissCompListLocation = [
'/FileStore/tables/suarsxcm1478429136036/jnj.csv'
,'/FileStore/tables/suarsxcm1478429136036/jpm.csv'
,'/FileStore/tables/suarsxcm1478429136036/ko.csv'
,'/FileStore/tables/suarsxcm1478429136036/pg.csv'
,'/FileStore/tables/suarsxcm1478429136036/ge.csv'
,'/FileStore/tables/l893vb431480695818503/baba.csv'
,'/FileStore/tables/l893vb431480695818503/bac.csv'
,'/FileStore/tables/l893vb431480695818503/cvx.csv'
,'/FileStore/tables/l893vb431480695818503/orcl.csv'
,'/FileStore/tables/l893vb431480695818503/p.csv'
,'/FileStore/tables/l893vb431480695818503/pep.csv'
,'/FileStore/tables/l893vb431480695818503/t.csv'
,'/FileStore/tables/l893vb431480695818503/wfc.csv'
,'/FileStore/tables/l893vb431480695818503/xom.csv'];


In [15]:
def getDataName(Object):
  return Object[40:-4]

In [16]:

  
def buildDataFramesDict(LocationList,currencyNumber):
  d={}
  for x in LocationList:
    d[getDataName(x)]= calculateChangeOvernight(joinTwoDataFrames(stockPreprocess(loadStockDatasetToDF(x))\
                      ,forexPreprocess(loadForexDatasetToDF(currencyListLocation[currencyNumber]))))
  return d
  

Build Dictionary Object which will contain (key=stock symbol;value = Dataframe with stock and forex combined )

In [18]:
swissCompDict = buildDataFramesDict(swissCompListLocation,0)
nonSwissCompDict = buildDataFramesDict(nonSwissCompListLocation,0)


#### 3. Test loaded data

In [20]:
list(swissCompDict.keys())


In [21]:
df1 = loadForexDatasetToDF('/FileStore/tables/suarsxcm1478429136036/USDCHF_DAILY.csv')
display(df1)

In [22]:
list(nonSwissCompDict.keys())

In [23]:
df = swissCompDict["swiss_abb"].na.drop()


In [24]:
df = nonSwissCompDict["pg"].na.drop()
display(df)

In [25]:
df_f = forexPreprocess(loadForexDatasetToDF('/FileStore/tables/suarsxcm1478429136036/USDCHF_DAILY.csv'));
df_f.describe()

In [26]:
display(df_f.stat.freqItems(["ForexOpen","ForexClose","ForexDailyChangePrc"],0.2))

####4. Machine Learning

In [28]:
#clean nulls
dataset = df.na.drop()
cols = dataset.columns


In [29]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["ForexOvernightUp"]
stages = [] # stages in our Pipeline
for categoricalCol in categoricalColumns:
  # Category Indexing with StringIndexer
  stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
  # Use OneHotEncoder to convert categorical variables into binary SparseVectors
  encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
  # Add stages.  These are not run here, but will run all at once later on.
  stages += [stringIndexer, encoder]

In [30]:
# Convert label into label indices using the StringIndexer
label_stringIdx = StringIndexer(inputCol = "StockOvernightUp", outputCol = "label")
stages += [label_stringIdx]

In [31]:
# Transform all features into a vector using VectorAssembler

numericCols = ["StockVolume", "StockOpen", "StockClose", "StockDailyChangePrc", "ForexOpen","ForexClose", "ForexDailyChangePrc"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [32]:
# Create a Pipeline.
pipeline = Pipeline(stages=stages)
# Run the feature transformations.
#  - fit() computes feature statistics as needed.
#  - transform() actually transforms the features.
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)

# Keep relevant columns
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

In [33]:
### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)
print trainingData.count()
print testData.count()

In [34]:
from pyspark.ml.classification import LogisticRegression

# Create initial LogisticRegression model
lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

# Train model with Training Data
lrModel = lr.fit(trainingData)

In [35]:
# Make predictions on test data using the transform() method.
# LogisticRegression.transform() will only use the 'features' column.
predictions = lrModel.transform(testData)

In [36]:
predictions.printSchema()

In [37]:
predictions.count()
predictions.where("prediction = 0 AND label=1").count()
predictions.where("prediction = 1 AND label=0").count()


In [38]:
selected = predictions.select("label", "prediction", "ForexOpen", "ForexClose", "ForexCloseToOpenChangePrc")
display(selected)

In [39]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# Evaluate model
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
evaluator.evaluate(predictions)

In [40]:
evaluator.getMetricName()

http://gim.unmc.edu/dxtests/roc3.htm - .90-1 = excellent (A)
.80-.90 = good (B)
.70-.80 = fair (C)
.60-.70 = poor (D)
.50-.60 = fail (F)

###4.2 Tuning of logistic Regression

In [43]:
print lr.explainParams()

In [44]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.01, 0.5, 2.0])
             .addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])
             .addGrid(lr.maxIter, [1, 5, 10])
             .build())

In [45]:
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)

# Run cross validations
cvModel = cv.fit(trainingData)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing

In [46]:
# Use test set here so we can measure the accuracy of our model on new data
predictions = cvModel.transform(testData)

In [47]:
evaluator.evaluate(predictions)

###Generic model  which allows us to test our dictionaries

In [49]:


### Randomly split data into training and test sets. set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.8, 0.2], seed = 100)


from pyspark.ml.classification import LogisticRegression

def trainMLModel(MLReadyDataFrame, trainingData):
  # Create initial LogisticRegression model
  lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

  # Train model with Training Data
  lrModel = lr.fit(trainingData)
  return lrModel

def testMLModel(trainedModel, testData):
  # Make predictions on test data using the transform() method.
  # LogisticRegression.transform() will only use the 'features' column.
  predictions = lrModel.transform(testData)
  return predictions

from pyspark.ml.evaluation import BinaryClassificationEvaluator

def evaluateMLModel(predictions)
  # Evaluate model
  evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
  return evaluator.evaluate(predictions)


In [50]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

def buildInputMLObject(dataset):
  categoricalColumns = ["ForexOvernightUp"]
  stages = [] # stages in our Pipeline
  for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", outputCol=categoricalCol+"classVec")
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]

  # Convert label into label indices using the StringIndexer
  label_stringIdx = StringIndexer(inputCol = "StockOvernightUp", outputCol = "label")
  stages += [label_stringIdx]

  # Transform all features into a vector using VectorAssembler

  numericCols = ["StockVolume", "StockOpen", "StockClose", "StockDailyChangePrc", "ForexOpen","ForexClose", "ForexDailyChangePrc"]
  assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + numericCols
  assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
  stages += [assembler]

  # Create a Pipeline.
  pipeline = Pipeline(stages=stages)
  # Run the feature transformations.
  #  - fit() computes feature statistics as needed.
  #  - transform() actually transforms the features.
  pipelineModel = pipeline.fit(dataset)
  dataset = pipelineModel.transform(dataset)

  # Keep relevant columns
  selectedcols = ["label", "features"] + cols
  MLReadyDataFrame = dataset.select(selectedcols)
  return MLReadyDataFrame

In [51]:
from pyspark.ml.classification import LogisticRegression

def trainMLModel(MLReadyDataFrame, trainingData):
  # Create initial LogisticRegression model
  lr = LogisticRegression(labelCol="label", featuresCol="features", maxIter=10)

  # Train model with Training Data
  lrModel = lr.fit(trainingData)
  return lrModel

def testMLModel(trainedModel, testData):
  # Make predictions on test data using the transform() method.
  # LogisticRegression.transform() will only use the 'features' column.
  predictions = trainedModel.transform(testData)
  return predictions

In [52]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

def evaluateMLModel(predictions):
  # Evaluate model
  evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction")
  return evaluator.evaluate(predictions)

In [53]:
#method takes dictionary of SparkDataframes and performes on it Regression model
#It returns Pandas DataFrame with statistics
def calculateLogisticRegressionAccuracy(inputDict):
  print"key | count | FN | TN | TP | TN | ROC_eval "
  for key, value in inputDict.iteritems():
      #drop all nulls
      stock = value.na.drop()
      cols = dataset.columns
      #transform data to ML freindly format
      MLinput = buildInputMLObject(stock)
      #split data into training and test sets
      (trainingData, testData) = MLinput.randomSplit([0.8, 0.2], seed = 100)
      #train the model 
      model = trainMLModel(MLinput, trainingData)
      #test model
      predictions = testMLModel(model, testData)
      #evaluate results
      testCount = predictions.count()
      FP = predictions.where("prediction = 0 AND label=1").count() #FN
      FN = predictions.where("prediction = 1 AND label=0").count() #TN
      TP = predictions.where("prediction = 1 AND label=1").count() #TP
      TN = predictions.where("prediction = 0 AND label=0").count() #TN
      result = evaluateMLModel(predictions)
      print str(key) +" | "+str(testCount)+" | "+str(FP)+" | "+str(FN)+" | "+str(TP)+" | "+str(TN)+" | "+str(result)
  return df

In [54]:
calculateLogisticRegressionAccuracy(swissCompDict)

In [55]:
calculateLogisticRegressionAccuracy(nonSwissCompDict)