In [0]:
# The following three lines are required if you are running PySpark using Jupyter notebook with Python3 as kernel 
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [0]:
dailyDF = spark.read.load("/FileStore/tables/daily_data.txt", format="csv", sep="\t", inferSchema="true",header="true")
dividendDF = spark.read.load("/FileStore/tables/dividend_data.txt", format="csv", sep="\t", inferSchema="true",header="true")

In [0]:
dailyDF.printSchema()

root
 |-- exchange: string (nullable = true)
 |-- stock_symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- open_price: double (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- close_price: double (nullable = true)
 |-- volume: integer (nullable = true)
 |-- adjusted_close_price: double (nullable = true)



In [0]:
dailyDF.count() #No. of rows

Out[25]: 171430

In [0]:
len(dailyDF.columns) #No. of columns

Out[26]: 9

In [0]:
dailyDF.show(10)

+--------+------------+----------+----------+----------+---------+-----------+-------+--------------------+
|exchange|stock_symbol|      date|open_price|high_price|low_price|close_price| volume|adjusted_close_price|
+--------+------------+----------+----------+----------+---------+-----------+-------+--------------------+
|    NYSE|         JEF|2010-02-08|      25.4|     25.49|    24.78|      24.82|1134300|               24.82|
|    NYSE|         JEF|2010-02-05|     24.91|     25.19|    24.08|      25.01|1765200|               25.01|
|    NYSE|         JEF|2010-02-04|     26.01|      26.2|    24.85|      24.85|1414400|               24.85|
|    NYSE|         JEF|2010-02-03|     26.23|     26.76|    26.22|      26.29|1066000|               26.29|
|    NYSE|         JEF|2010-02-02|     26.08|     26.86|    25.78|      26.46|1496400|               26.46|
|    NYSE|         JEF|2010-02-01|     25.61|     26.11|    25.36|      26.11|2381800|               26.11|
|    NYSE|         JEF|2010-

In [0]:
dailyDF.select(dailyDF.stock_symbol,dailyDF.date,dailyDF.volume,dailyDF.adjusted_close_price).show(n=10)

+------------+----------+-------+--------------------+
|stock_symbol|      date| volume|adjusted_close_price|
+------------+----------+-------+--------------------+
|         JEF|2010-02-08|1134300|               24.82|
|         JEF|2010-02-05|1765200|               25.01|
|         JEF|2010-02-04|1414400|               24.85|
|         JEF|2010-02-03|1066000|               26.29|
|         JEF|2010-02-02|1496400|               26.46|
|         JEF|2010-02-01|2381800|               26.11|
|         JEF|2010-01-29|2010000|               25.54|
|         JEF|2010-01-28|1708100|               26.36|
|         JEF|2010-01-27|1929700|               27.14|
|         JEF|2010-01-26|1422100|                26.5|
+------------+----------+-------+--------------------+
only showing top 10 rows



In [0]:
dailyDF.select(dailyDF.stock_symbol,dailyDF.date,dailyDF.volume,dailyDF.adjusted_close_price).filter(dailyDF["volume"] > 2000000).show(n=10)

+------------+----------+-------+--------------------+
|stock_symbol|      date| volume|adjusted_close_price|
+------------+----------+-------+--------------------+
|         JEF|2010-02-01|2381800|               26.11|
|         JEF|2010-01-29|2010000|               25.54|
|         JEF|2010-01-22|4806900|               26.58|
|         JEF|2010-01-21|4037000|                27.0|
|         JEF|2010-01-20|3740600|                26.8|
|         JEF|2010-01-15|3198700|               25.48|
|         JEF|2010-01-14|2090400|               25.82|
|         JEF|2010-01-13|2418900|               25.46|
|         JEF|2010-01-12|3174200|               25.53|
|         JEF|2010-01-08|2182100|               25.98|
+------------+----------+-------+--------------------+
only showing top 10 rows



In [0]:
dividendDF = spark.read.load("/FileStore/tables/dividend_data.txt", format="csv", sep="\t", inferSchema="true",header="true")

In [0]:
dividendDF.printSchema()

root
 |-- exchange: string (nullable = true)
 |-- stock_symbol: string (nullable = true)
 |-- date: date (nullable = true)
 |-- dividends: double (nullable = true)



In [0]:
dividendDF.show()

+--------+------------+----------+---------+
|exchange|stock_symbol|      date|dividends|
+--------+------------+----------+---------+
|    NYSE|         JAH|2009-12-30|    0.075|
|    NYSE|         JAH|2009-09-29|    0.075|
|    NYSE|         JGT|2009-12-11|    0.377|
|    NYSE|         JGT|2009-09-11|    0.377|
|    NYSE|         JGT|2009-06-11|    0.377|
|    NYSE|         JGT|2009-03-11|    0.377|
|    NYSE|         JGT|2008-12-11|    0.377|
|    NYSE|         JGT|2008-09-11|    0.451|
|    NYSE|         JGT|2008-06-11|    0.451|
|    NYSE|         JGT|2008-03-12|    0.451|
|    NYSE|         JGT|2007-12-12|    0.451|
|    NYSE|         JGT|2007-09-12|    0.451|
|    NYSE|         JGT|2007-06-13|    0.451|
|    NYSE|         JKG|2009-12-24|    0.327|
|    NYSE|         JKG|2009-09-23|    0.223|
|    NYSE|         JKG|2009-06-23|    0.177|
|    NYSE|         JKG|2009-03-25|    0.171|
|    NYSE|         JKG|2008-12-29|    0.077|
|    NYSE|         JKG|2008-12-24|     0.34|
|    NYSE|

In [0]:
dividendDF.createOrReplaceTempView("dividend_table")

result = spark.sql("SELECT stock_symbol, COUNT(*) AS dividend_count FROM dividend_table GROUP BY stock_symbol ORDER BY stock_symbol")

In [0]:
result.show()

+------------+--------------+
|stock_symbol|dividend_count|
+------------+--------------+
|         JAH|             2|
|         JBI|            13|
|         JBJ|            13|
|         JBK|            20|
|         JBL|            15|
|         JBN|             5|
|         JBO|            11|
|         JBR|             5|
|         JBT|             5|
|         JCE|            11|
|         JCI|            97|
|         JCP|           114|
|         JDD|            50|
|         JEF|            72|
|         JEQ|             8|
|         JFC|            19|
|         JFP|            58|
|         JFR|            68|
|         JGG|            17|
|         JGT|            11|
+------------+--------------+
only showing top 20 rows



In [0]:
joined_df=dailyDF.join(dividendDF, ['stock_symbol', 'date'], 'inner')

In [0]:
joined_df1=joined_df.select('stock_symbol', 'date', 'volume', 'adjusted_close_price', 'dividends')

In [0]:
joined_df1.show()

+------------+----------+-------+--------------------+---------+
|stock_symbol|      date| volume|adjusted_close_price|dividends|
+------------+----------+-------+--------------------+---------+
|         JEF|2008-05-13|1595800|               17.75|    0.125|
|         JEF|2008-02-13|1158400|               18.73|    0.125|
|         JEF|2007-11-13|1488700|               24.52|    0.125|
|         JEF|2007-08-13|1646400|               24.33|    0.125|
|         JEF|2007-05-11|2019500|                31.3|    0.125|
|         JEF|2007-02-13| 519200|               27.57|    0.125|
|         JEF|2006-11-13| 467000|               28.51|    0.125|
|         JEF|2006-08-11| 385600|               23.93|    0.125|
|         JEF|2006-05-23|1009300|                28.4|    0.063|
|         JEF|2006-02-13| 511200|               25.25|    0.075|
|         JEF|2005-11-10| 658200|               21.65|    0.075|
|         JEF|2005-08-11| 283800|               19.48|     0.06|
|         JEF|2005-05-12|

In [0]:
# Execute Spark SQL query with JOIN
result = dailyDF.join(dividendDF, (dailyDF["date"] == dividendDF["date"]) & (dailyDF["stock_symbol"] == dividendDF["stock_symbol"]), "inner") \
            .select(dailyDF["date"], dailyDF["stock_symbol"], dailyDF["volume"], dailyDF["adjusted_close_price"],dividendDF["dividends"])

# Show the result
result.show()

+----------+------------+-------+--------------------+---------+
|      date|stock_symbol| volume|adjusted_close_price|dividends|
+----------+------------+-------+--------------------+---------+
|2008-05-13|         JEF|1595800|               17.75|    0.125|
|2008-02-13|         JEF|1158400|               18.73|    0.125|
|2007-11-13|         JEF|1488700|               24.52|    0.125|
|2007-08-13|         JEF|1646400|               24.33|    0.125|
|2007-05-11|         JEF|2019500|                31.3|    0.125|
|2007-02-13|         JEF| 519200|               27.57|    0.125|
|2006-11-13|         JEF| 467000|               28.51|    0.125|
|2006-08-11|         JEF| 385600|               23.93|    0.125|
|2006-05-23|         JEF|1009300|                28.4|    0.063|
|2006-02-13|         JEF| 511200|               25.25|    0.075|
|2005-11-10|         JEF| 658200|               21.65|    0.075|
|2005-08-11|         JEF| 283800|               19.48|     0.06|
|2005-05-12|         JEF|

In [0]:
# Specify the path where you want to save the CSV file
output_path = "/FileStore/tables/resultnew.csv"

# Save the DataFrame in comma-separated format
joined_df1.write.csv(output_path, header=True, mode="overwrite")


In [0]:
%fs ls dbfs:/FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/customers-1.tsv,customers-1.tsv,44551,1703222763000
dbfs:/FileStore/tables/customers-1.txt,customers-1.txt,44624,1703222763000
dbfs:/FileStore/tables/customers-2.tsv,customers-2.tsv,44551,1703223086000
dbfs:/FileStore/tables/customers-2.txt,customers-2.txt,44624,1703223086000
dbfs:/FileStore/tables/customers.tsv,customers.tsv,44551,1703222708000
dbfs:/FileStore/tables/customers.txt,customers.txt,44624,1703222708000
dbfs:/FileStore/tables/daily_data.txt,daily_data.txt,9105715,1703223562000
dbfs:/FileStore/tables/dividend_data.txt,dividend_data.txt,55598,1703223560000
dbfs:/FileStore/tables/products-1.json,products-1.json,3261964,1703223088000
dbfs:/FileStore/tables/products-1.tsv,products-1.tsv,1259724,1703223088000


In [0]:
resultnewDF = spark.read.load("/FileStore/tables/resultnew.csv", format="csv", sep="\t", inferSchema="true",header="true")

In [0]:
resultnewDF.show()

+-------------------------------------------------------+
|stock_symbol,date,volume,adjusted_close_price,dividends|
+-------------------------------------------------------+
|                                   JEF,2008-05-13,15...|
|                                   JEF,2008-02-13,11...|
|                                   JEF,2007-11-13,14...|
|                                   JEF,2007-08-13,16...|
|                                   JEF,2007-05-11,20...|
|                                   JEF,2007-02-13,51...|
|                                   JEF,2006-11-13,46...|
|                                   JEF,2006-08-11,38...|
|                                   JEF,2006-05-23,10...|
|                                   JEF,2006-02-13,51...|
|                                   JEF,2005-11-10,65...|
|                                   JEF,2005-08-11,28...|
|                                   JEF,2005-05-12,13...|
|                                   JEF,2005-02-11,82...|
|             