In [1]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("ETL Case Study").getOrCreate()

### Reading the datasets into spark dataframes and assigning column names as per data dictionary

In [3]:
df1 = spark.read.load("NYSE_daily.tsv", format="csv", sep="\t", inferSchema="true")
df1.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: double (nullable = true)
 |-- _c5: double (nullable = true)
 |-- _c6: double (nullable = true)
 |-- _c7: integer (nullable = true)
 |-- _c8: double (nullable = true)



In [4]:
cols = ["exchange", "stock_symbol", "date", "open_price", "high_price", "low_price", "close_price", "volume","adjusted_close","price"]

In [5]:
daily_df = df1 
for i in range(len(cols)):
    c = "_c"+str(i)
#     print(c)
    daily_df = daily_df.withColumnRenamed(c, cols[i])
#     daily_df.printSchema()

### Print schema

In [6]:
daily_df.printSchema()

root
 |-- exchange: string (nullable = true)
 |-- stock_symbol: string (nullable = true)
 |-- date: string (nullable = true)
 |-- open_price: double (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted_close: double (nullable = true)



### Show top 10 rows

In [7]:
daily_df.show(10)

+--------+------------+---------+----------+----------+---------+-----------+-------+--------------+
|exchange|stock_symbol|     date|open_price|high_price|low_price|close_price| volume|adjusted_close|
+--------+------------+---------+----------+----------+---------+-----------+-------+--------------+
|    NYSE|         JEF| 2/8/2010|      25.4|     25.49|    24.78|      24.82|1134300|         24.82|
|    NYSE|         JEF| 2/5/2010|     24.91|     25.19|    24.08|      25.01|1765200|         25.01|
|    NYSE|         JEF| 2/4/2010|     26.01|      26.2|    24.85|      24.85|1414400|         24.85|
|    NYSE|         JEF| 2/3/2010|     26.23|     26.76|    26.22|      26.29|1066000|         26.29|
|    NYSE|         JEF| 2/2/2010|     26.08|     26.86|    25.78|      26.46|1496400|         26.46|
|    NYSE|         JEF| 2/1/2010|     25.61|     26.11|    25.36|      26.11|2381800|         26.11|
|    NYSE|         JEF|1/29/2010|     26.57|      26.8|    25.41|      25.54|2010000|      

In [8]:
df2 = spark.read.load("NYSE_dividends.tsv", format="csv", sep="\t", inferSchema="True")
df2.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)



In [9]:
cols2 = ['exchange', 'stock_symbol','date', 'dividends']

In [10]:
div_df = df2
for i in range(len(cols2)):
    c = "_c"+str(i)
#     print(c)
    div_df = div_df.withColumnRenamed(c, cols2[i])
#     daily_df.printSchema()

In [11]:
div_df.show(10)

+--------+------------+----------+---------+
|exchange|stock_symbol|      date|dividends|
+--------+------------+----------+---------+
|    NYSE|         JAH|12/30/2009|    0.075|
|    NYSE|         JAH| 9/29/2009|    0.075|
|    NYSE|         JGT|12/11/2009|    0.377|
|    NYSE|         JGT| 9/11/2009|    0.377|
|    NYSE|         JGT| 6/11/2009|    0.377|
|    NYSE|         JGT| 3/11/2009|    0.377|
|    NYSE|         JGT|12/11/2008|    0.377|
|    NYSE|         JGT| 9/11/2008|    0.451|
|    NYSE|         JGT| 6/11/2008|    0.451|
|    NYSE|         JGT| 3/12/2008|    0.451|
+--------+------------+----------+---------+
only showing top 10 rows



### Creating template view from dataframe

In [12]:
daily_df.createOrReplaceTempView("daily")
div_df.createOrReplaceTempView("div")

### 1. List the companies which have the stock close price more than or equal to 200 and stock volume more than or equal to 10 million.

In [13]:
q1 = spark.sql("Select * from daily where close_price >= 200 and volume >= 10000000")
q1.show()

+--------+------------+----------+----------+----------+---------+-----------+--------+--------------+
|exchange|stock_symbol|      date|open_price|high_price|low_price|close_price|  volume|adjusted_close|
+--------+------------+----------+----------+----------+---------+-----------+--------+--------------+
|    NYSE|        JNPR| 11/3/2000|     198.0|    216.88|   196.25|     216.13|13424800|        216.13|
|    NYSE|        JNPR|10/19/2000|    229.13|    234.31|    220.0|     232.58|11323800|        232.58|
|    NYSE|        JNPR|10/18/2000|    219.38|     235.0|    212.5|     213.88|15463100|        213.88|
|    NYSE|        JNPR|10/17/2000|    241.75|    241.81|    224.0|     229.19|16734200|        229.19|
|    NYSE|        JNPR|10/16/2000|    226.75|     244.5|    224.0|      243.0|17288400|         243.0|
|    NYSE|        JNPR|10/13/2000|    201.75|     229.5|   201.63|      228.5|19565000|         228.5|
|    NYSE|        JNPR|10/11/2000|     201.5|    219.44|   196.19|      2

### 2. List the companies that have given dividends more than 50 times. The list should include the number of times they have given dividends.

In [14]:
q2 = spark.sql("Select stock_symbol, count(*) AS Count from div group by stock_symbol")
q2.where(q2.Count>50).show()

+------------+-----+
|stock_symbol|Count|
+------------+-----+
|         JCP|  114|
|         JEF|   72|
|         JPM|  104|
|         JRO|   63|
|         JFP|   58|
|         JHI|   99|
|         JNJ|  160|
|         JHS|   88|
|         JTP|   91|
|         JOE|   51|
|         JQC|   55|
|         JHP|   85|
|         JPS|   89|
|         JPC|   60|
|         JCI|   97|
|         JFR|   68|
|         JWN|   81|
+------------+-----+



### 3.List the companies along with their close price, dividends and the date when they gave dividends of more than 0.01 when their daily close price was more than or equal to 100 sorted by dividends in ascending order.

In [20]:
q3 = spark.sql("Select daily.stock_symbol, close_price, dividends from daily JOIN div ON daily.stock_symbol = div.stock_symbol where dividends > 0.01 and close_price >= 100 ORDER BY dividends")
q3.show()

+------------+-----------+---------+
|stock_symbol|close_price|dividends|
+------------+-----------+---------+
|         JNJ|     102.01|  0.01042|
|         JNJ|     100.14|  0.01042|
|         JNJ|     103.06|  0.01042|
|         JNJ|     102.01|  0.01042|
|         JNJ|     103.06|  0.01042|
|         JNJ|      101.8|  0.01042|
|         JNJ|     103.06|  0.01042|
|         JNJ|      101.8|  0.01042|
|         JNJ|     103.06|  0.01042|
|         JNJ|      101.8|  0.01042|
|         JNJ|     103.06|  0.01042|
|         JNJ|      101.8|  0.01042|
|         JNJ|      103.1|  0.01042|
|         JNJ|     102.01|  0.01042|
|         JNJ|      103.1|  0.01042|
|         JNJ|     102.01|  0.01042|
|         JNJ|      103.1|  0.01042|
|         JNJ|     102.19|  0.01042|
|         JNJ|     100.14|  0.01042|
|         JNJ|     102.19|  0.01042|
+------------+-----------+---------+
only showing top 20 rows



In [21]:
type(q3)

pyspark.sql.dataframe.DataFrame

### 4. Save the above lists as comma separated files.

In [24]:
q1.write.csv('q1.csv')

In [25]:
q2.write.csv('q2_csv')

In [23]:
q3.write.csv('q3_csv')