In [1]:
import findspark
findspark.init()

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

In [4]:
df = spark.sql("select 'spark' as hello")
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



In [5]:
from pyspark.sql.types import StructType, StructField, DateType, DecimalType , IntegerType, StringType


In [6]:
spark = SparkSession.builder.master("local[*]").appName("batch pipelines").getOrCreate()

In [7]:
spark.sparkContext.setLogLevel("WARN")

In [8]:
source_data_file = "C:\data\*"

In [9]:
schema = StructType([
    StructField("symbol", StringType()),
    StructField("date", DateType()),
    StructField("open", DecimalType(precision=38, scale=2)),
    StructField("high", DecimalType(precision=38, scale=2)),
    StructField("low", DecimalType(precision=38, scale=2)),
    StructField("close", DecimalType(precision=38, scale=2)),
    StructField("volume", IntegerType()),
    StructField("adj_close", DecimalType(precision=38, scale=2))
])

In [10]:
data = spark.read.csv(source_data_file, schema=schema).cache()

In [11]:
count = data.count()

In [12]:
data.show()

+------+----------+-----+-----+-----+-----+------+---------+
|symbol|      date| open| high|  low|close|volume|adj_close|
+------+----------+-----+-----+-----+-----+------+---------+
|  null|      null| null| null| null| null|  null|     null|
|    IF|2016-12-16| 6.32| 6.36| 6.23| 6.31| 11100|     6.31|
|    IF|2016-12-15| 6.40| 6.41| 6.36| 6.40|  8500|     6.40|
|   ISL|2016-12-16|16.08|16.08|16.08|16.08|   500|    16.08|
|   ISL|2016-12-15|16.44|16.44|16.20|16.22|  6100|    16.22|
|   FCO|2016-12-16| 7.96| 8.01| 7.96| 7.99| 15600|     7.99|
|   FCO|2016-12-15| 8.03| 8.06| 8.01| 8.02| 21400|     8.02|
|   ACU|2016-12-16|23.11|23.80|23.11|23.80|  2800|    23.80|
|   ACU|2016-12-15|22.56|23.10|22.50|23.01|  5100|    23.01|
|   FAX|2016-12-16| 4.75| 4.76| 4.73| 4.74|700800|     4.74|
|   FAX|2016-12-15| 4.81| 4.81| 4.78| 4.78|625100|     4.78|
|  AIII|2016-12-16| 1.24| 1.28| 1.02| 1.07| 83400|     1.07|
|  AIII|2016-12-15| 1.27| 1.27| 1.15| 1.18| 11500|     1.18|
|   IAF|2016-12-16| 5.56

In [13]:
print("Data points from files count: {}".format(count))

Data points from files count: 2966121


In [14]:
from pyspark.sql.functions import udf

In [18]:
def extract_month(date):
    if date is not None:
        return int(date.month)

In [19]:
def extract_year(date):
    if date is not None:
        return int(date.year)

In [20]:
def extract_day(date):
    if date is not None:
        return int(date.day)

In [21]:
udf_month = udf(extract_month, IntegerType())
udf_year = udf(extract_year, IntegerType())
udf_day = udf(extract_day, IntegerType())

In [22]:
tup = ("close", "adj_close")
df_with_month_year = data \
    .withColumn("month", udf_month("date")) \
    .withColumn("year", udf_year("date")) \
    .withColumn("day", udf_day("date"))

In [23]:
df_with_month_year.show()

+------+----------+-----+-----+-----+-----+------+---------+-----+----+----+
|symbol|      date| open| high|  low|close|volume|adj_close|month|year| day|
+------+----------+-----+-----+-----+-----+------+---------+-----+----+----+
|  null|      null| null| null| null| null|  null|     null| null|null|null|
|    IF|2016-12-16| 6.32| 6.36| 6.23| 6.31| 11100|     6.31|   12|2016|  16|
|    IF|2016-12-15| 6.40| 6.41| 6.36| 6.40|  8500|     6.40|   12|2016|  15|
|   ISL|2016-12-16|16.08|16.08|16.08|16.08|   500|    16.08|   12|2016|  16|
|   ISL|2016-12-15|16.44|16.44|16.20|16.22|  6100|    16.22|   12|2016|  15|
|   FCO|2016-12-16| 7.96| 8.01| 7.96| 7.99| 15600|     7.99|   12|2016|  16|
|   FCO|2016-12-15| 8.03| 8.06| 8.01| 8.02| 21400|     8.02|   12|2016|  15|
|   ACU|2016-12-16|23.11|23.80|23.11|23.80|  2800|    23.80|   12|2016|  16|
|   ACU|2016-12-15|22.56|23.10|22.50|23.01|  5100|    23.01|   12|2016|  15|
|   FAX|2016-12-16| 4.75| 4.76| 4.73| 4.74|700800|     4.74|   12|2016|  16|

In [24]:
df_with_month_year_count = df_with_month_year.count()

In [25]:
print("Data points after adding month, year, day: {}".format(df_with_month_year_count))

Data points after adding month, year, day: 2966121


In [26]:
df2 = df_with_month_year.filter("symbol is not NULL")

In [27]:
df2.show()

+------+----------+-----+-----+-----+-----+------+---------+-----+----+---+
|symbol|      date| open| high|  low|close|volume|adj_close|month|year|day|
+------+----------+-----+-----+-----+-----+------+---------+-----+----+---+
|    IF|2016-12-16| 6.32| 6.36| 6.23| 6.31| 11100|     6.31|   12|2016| 16|
|    IF|2016-12-15| 6.40| 6.41| 6.36| 6.40|  8500|     6.40|   12|2016| 15|
|   ISL|2016-12-16|16.08|16.08|16.08|16.08|   500|    16.08|   12|2016| 16|
|   ISL|2016-12-15|16.44|16.44|16.20|16.22|  6100|    16.22|   12|2016| 15|
|   FCO|2016-12-16| 7.96| 8.01| 7.96| 7.99| 15600|     7.99|   12|2016| 16|
|   FCO|2016-12-15| 8.03| 8.06| 8.01| 8.02| 21400|     8.02|   12|2016| 15|
|   ACU|2016-12-16|23.11|23.80|23.11|23.80|  2800|    23.80|   12|2016| 16|
|   ACU|2016-12-15|22.56|23.10|22.50|23.01|  5100|    23.01|   12|2016| 15|
|   FAX|2016-12-16| 4.75| 4.76| 4.73| 4.74|700800|     4.74|   12|2016| 16|
|   FAX|2016-12-15| 4.81| 4.81| 4.78| 4.78|625100|     4.78|   12|2016| 15|
|  AIII|2016

In [28]:
df2_count = df2.count()
print("Data points remaining after removing nulls: {}".format(df2_count))

Data points remaining after removing nulls: 2965870


In [29]:
print("Removed {} nulls".format(df_with_month_year_count - df2_count))

Removed 251 nulls


In [30]:
from pyspark.sql.functions import avg

In [31]:
monthly_avg_close_df = df2.groupBy(df2.month,df2.year).agg(avg("close").alias("average_month_close")).orderBy(df2.year,df2.month)

monthly_avg_close_df.show()



+-----+----+-------------------+
|month|year|average_month_close|
+-----+----+-------------------+
|   12|2015|          28.854235|
|    1|2016|          26.868989|
|    2|2016|          26.402522|
|    3|2016|          28.388157|
|    4|2016|          29.081379|
|    5|2016|          29.032151|
|    6|2016|          29.484503|
|    7|2016|          30.366564|
|    8|2016|          33.058425|
|    9|2016|          35.427533|
|   10|2016|          35.053195|
|   11|2016|          35.915486|
|   12|2016|          43.785477|
+-----+----+-------------------+



In [32]:
adj_close_diff_than_close = df2.filter("close != adj_close").groupBy(df2.month,df2.year).count().orderBy(df2.year,df2.month)

adj_close_diff_than_close.show()

+-----+----+-----+
|month|year|count|
+-----+----+-----+
|   12|2015|   34|
|    1|2016|  835|
|    2|2016| 1353|
|    3|2016| 1390|
|    4|2016|  920|
|    5|2016| 1490|
|    6|2016| 1495|
|    7|2016|  925|
|    8|2016| 1445|
|    9|2016| 1285|
|   10|2016|  906|
|   11|2016| 1366|
|   12|2016| 1514|
+-----+----+-----+



In [None]:
df_diff = df2.filter("close != adj_close")

In [None]:
df_diff.show()

In [82]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, DateType, DecimalType , IntegerType, StringType
def finclose(close,adj_close):
    if close == adj_close:
        return close
    elif close > adj_close:
        return close
    else:
        return adj_close

In [83]:
#tup = ("close", "adj_close")
udf_finalclose = udf(finclose, DecimalType())

In [84]:
df_with_month_year = data.withColumn('fin_close', udf_finalclose('close','adj_close')) 
   

In [85]:
df_with_month_year.show()

+------+----------+-----+-----+-----+-----+------+---------+---------+
|symbol|      date| open| high|  low|close|volume|adj_close|fin_close|
+------+----------+-----+-----+-----+-----+------+---------+---------+
|  null|      null| null| null| null| null|  null|     null|     null|
|    IF|2016-12-16| 6.32| 6.36| 6.23| 6.31| 11100|     6.31|        6|
|    IF|2016-12-15| 6.40| 6.41| 6.36| 6.40|  8500|     6.40|        6|
|   ISL|2016-12-16|16.08|16.08|16.08|16.08|   500|    16.08|       16|
|   ISL|2016-12-15|16.44|16.44|16.20|16.22|  6100|    16.22|       16|
|   FCO|2016-12-16| 7.96| 8.01| 7.96| 7.99| 15600|     7.99|        8|
|   FCO|2016-12-15| 8.03| 8.06| 8.01| 8.02| 21400|     8.02|        8|
|   ACU|2016-12-16|23.11|23.80|23.11|23.80|  2800|    23.80|       24|
|   ACU|2016-12-15|22.56|23.10|22.50|23.01|  5100|    23.01|       23|
|   FAX|2016-12-16| 4.75| 4.76| 4.73| 4.74|700800|     4.74|        5|
|   FAX|2016-12-15| 4.81| 4.81| 4.78| 4.78|625100|     4.78|        5|
|  AII