<a href="https://colab.research.google.com/github/ppaunovski/pyspark-pidp/blob/master/pyspark_pidp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, expr
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType
from pyspark.sql.types import FloatType
from datetime import datetime
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col
import datetime
from pyspark.sql.functions import corr


In [None]:
spark = SparkSession.builder.appName("Stock Price Analysis").getOrCreate()
stocks = spark.read.csv('gs://pidp-pajo-bucket/stocks', header=True).withColumn('Ticker', input_file_name())
stocks.show()

In [None]:
ticker_parser = udf(lambda ticker: (ticker.split('/')[-1]).split('.')[0])
date_parser = udf(lambda date: datetime.strptime(date,"%Y-%m-%d"), DateType())

def num_parser(value):
    if isinstance(value, str):
        return float(value.strip("$"))
    elif isinstance(value, float):
        return value
    elif isinstance(value, int):
        return float(value)
    else:
        return None

parser_number = udf(num_parser, FloatType())
parse_int = udf(lambda x: int(x), IntegerType())

In [None]:
stocks = stocks.withColumn('Ticker', ticker_parser(stocks.Ticker))
stocks = stocks.withColumn("Date", date_parser(stocks.Date))
stocks = (stocks.withColumn("Open", parser_number(stocks.Open))
                .withColumn("Adj Close", parser_number(stocks["Adj Close"]))
                .withColumn("Close", parser_number(stocks.Close))
                .withColumn("Low", parser_number(stocks.Low))
                .withColumn("High", parser_number(stocks.High)))
stocks = stocks.withColumn("Volume", parse_int(stocks.Volume))
stocks = stocks.select(["Ticker", "Date", "Volume", "Open", "Low", "High", "Close", "Adj Close"])

In [None]:
stocks.show()

In [None]:
start_time = datetime.datetime.now()

self_join_data = stocks.select(
    col('Ticker').alias('Ticker_1'),
    'Date',
    col('Close').alias('Close_1')
).join(

    stocks.select(

        col('Ticker').alias('Ticker_2'),
        'Date',
        col('Close').alias('Close_2')

    ),
    on = 'Date',
    how = 'inner'

).filter(

    col('Ticker_1') < col('Ticker_2')

)
self_join_data.show(5)

stop_time = datetime.datetime.now()
diff = stop_time - start_time

print('Time taken to self join: ', diff)

In [None]:
print('Total number of observations after self-join: ', self_join_data.count())

In [None]:
start_time = datetime.datetime.now()
stock_corr = self_join_data.groupBy('Ticker_1', 'Ticker_2').agg(corr('Close_1', 'Close_2').alias('correlation'))
stock_corr.show()

stop_time = datetime.datetime.now()
diff = stop_time - start_time

print('Time taken to calculate correlation: ', diff)