In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [28]:
!pip install findspark pyspark



In [2]:
import findspark
findspark.init()
import pyspark

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession


sp = SparkContext()
ss = SparkSession(sp).builder.config("spark.driver.memory", "8g").getOrCreate()

import pyspark.sql.functions as f

In [3]:
#df = ss.read.csv("stocks-data/MS1.txt").limit(10000)
df = ss.read.csv('/content/drive/MyDrive/BigDataManagement/stocks-data/MS1.txt')

In [4]:
from pyspark.sql.functions import date_format

newdf = df.select(df._c0.alias('stock_name'),
                  date_format(f.to_timestamp('_c1', 'mm/dd/yyyy'), "yyyy-mm-dd").alias('date'),
                  df._c2.alias('price'),
                  df._c3.alias('volume')).filter((f.year('date') >= 2016) & (f.year('date')<=2020))\
          .dropDuplicates(['stock_name', 'date'])

In [5]:
dates = [row.date for row in newdf.select('date').distinct().collect()]
dates.sort()
dates2 = dates[:1000]

In [6]:
stocks = [row.stock_name for row in newdf.where(newdf.date.isin(dates2)).groupBy('stock_name').count().orderBy('count', ascending=False).take(1000)]

In [7]:
chosen_stocks = stocks[:1000]

In [8]:
df_small = newdf.filter(newdf.stock_name.isin(chosen_stocks))\
             .filter((dates2[0] <= newdf.date) & (newdf.date <= dates2[-1]))
df_small = df_small.cache()
df.unpersist()
newdf.unpersist()

DataFrame[stock_name: string, date: string, price: string, volume: string]

In [9]:
df_small= df_small.withColumn('date', f.col('date'))\
                  .withColumn("date_existent", f.col("date"))

----

In [10]:
from pyspark.sql.types import ArrayType, StringType

In [11]:
def func():
    return dates2

In [12]:
func_udf = f.udf(func, ArrayType(StringType()))

In [13]:
df_base = df_small.select('stock_name').distinct().withColumn('date', f.explode(func_udf()))\
                                               .withColumn('date', f.col('date'))

In [14]:
df_full = df_base.join(df_small, ['stock_name', 'date'], "leftouter")

In [15]:
df_small.unpersist()
df_full = df_full.cache()

----

In [16]:
from pyspark.sql import Window
import sys

In [17]:
window_ff = Window.partitionBy('stock_name')\
               .orderBy('date')\
               .rowsBetween(-sys.maxsize, 0)
               
window_bf = Window.partitionBy('stock_name')\
               .orderBy('date')\
               .rowsBetween(0, sys.maxsize)

In [26]:
read_last = f.last(df_full['price'], ignorenulls=True).over(window_ff)
readdate_last = f.last(df_full['date_existent'], ignorenulls=True).over(window_ff)

read_next = f.first(df_full['price'], ignorenulls=True).over(window_bf)
readdate_next = f.first(df_full['date_existent'], ignorenulls=True).over(window_bf)

df_filled = df_full.withColumn('price_ff', read_last)\
                   .withColumn('date_ff', readdate_last)\
                   .withColumn('price_bf', read_next)\
                   .withColumn('date_bf', readdate_next)

In [27]:
from pyspark.sql.types import FloatType
import datetime

def interpol(x, x_prev, x_next, y_prev, y_next, y):
    if x_prev == x_next:
        return float(y)
    else:
        #if (y_prev is not None) & (y_next is not None): 
        #    m = (float(y_next)-float(y_prev))/(datetime.date(int(x_next[:3]), int(x_next[5:7]), int(x_next[-2:])) - datetime.date(int(x_prev[:3]), int(x_prev[5:7]), int(x_prev[-2:]))).days
        #    y_interpol = float(y_prev) + m * (datetime.date(int(x[:3]), int(x[5:7]), int(x[-2:])) - datetime.date(int(x_prev[:3]), int(x_prev[5:7]), int(x_prev[-2:]))).days
        if (y_prev is not None) & (y_next is not None): 
            m = (float(y_next)-float(y_prev))/(datetime.datetime.strptime(x_next, '%Y-%m-%d') - datetime.datetime.strptime(x_prev, '%Y-%m-%d')).days
            y_interpol = float(y_prev) + m * (datetime.datetime.strptime(x, '%Y-%m-%d') - datetime.datetime.strptime(x_prev, '%Y-%m-%d')).days
        if x_prev is not None:
            y_interpol = y_prev
        else:
            y_interpol = y_next
        return float(y_interpol)

interpol_udf = f.udf(interpol, FloatType())

In [28]:
df_filled = df_filled.withColumn('price_interpol',
                                 interpol_udf('date', 'date_ff', 'date_bf', 'price_ff', 'price_bf', 'price'))\
                     .drop('date_existent', 'date_ff', 'date_bf', 'price', 'value', 'price_bf', 'price_ff')\
                     .withColumnRenamed('price_interpol', 'price')
df_filled.repartition(1000)

DataFrame[stock_name: string, date: string, volume: string, price: float]

In [29]:
df_filled.filter('price is null').count()

0

In [30]:
df_filled.count()

1000000

In [31]:
import pandas as pd

In [32]:
df_filled.repartition(1).toPandas().to_csv('/content/drive/MyDrive/BigDataManagement/stocks-ta3.csv')