## Getting Spark Ready

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True) # Property used to format output tables better
spark

## Part 1



#### Read last day and preprocess

The column names are persian and cause problems in spark. So they are all translated to english and the types are casted to appropriate types like int, float, long, datetime, ....

We can see Schema and the table after the preprocessing

In [None]:
from pyspark.sql.functions import col, split, substring, length, to_date, to_timestamp

# Read last day
df = spark.read.option("header", "true").csv("/content/drive/MyDrive/BD3/stock/1399-12-27.csv")

df = df.withColumn("symbol",col('نماد')).drop('نماد') \
    .withColumn("name", col('نام')).drop('نام') \
    .withColumn("quantity", col('تعداد').cast('int')).drop('تعداد') \
    .withColumn("volume", col('حجم').cast('int')).drop('حجم') \
    .withColumn("value", col('ارزش').cast('long')).drop('ارزش') \
    .withColumn("yesterday", col('دیروز').cast('int')).drop('دیروز') \
    .withColumn("first", col('اولین').cast('int')).drop('اولین') \
    .withColumn("last_trade_value", col('آخرین معامله - مقدار').cast('int')).drop('آخرین معامله - مقدار') \
    .withColumn("last_trade_change", col('آخرین معامله - تغییر').cast('int')).drop('آخرین معامله - تغییر') \
    .withColumn("last_trade_percent", col('آخرین معامله - درصد').cast('float')).drop('آخرین معامله - درصد') \
    .withColumn("last_price_value", col('قیمت پایانی - مقدار').cast('int')).drop('قیمت پایانی - مقدار') \
    .withColumn("last_price_change", col('قیمت پایانی - تغییر').cast('int')).drop('قیمت پایانی - تغییر') \
    .withColumn("last_price_percent", col('قیمت پایانی - درصد').cast('float')).drop('قیمت پایانی - درصد') \
    .withColumn("lowest", col('کمترین').cast('int')).drop('کمترین') \
    .withColumn("highest", col('بیشترین').cast('int')).drop('بیشترین') \
    .drop("_c0") \
    .withColumn("date", to_date('date', 'dd-MM-yyyy')) 

df.printSchema()
df.show(5, truncate = False)

root
 |-- date: date (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- volume: integer (nullable = true)
 |-- value: integer (nullable = true)
 |-- yesterday: integer (nullable = true)
 |-- first: integer (nullable = true)
 |-- last_trade_value: integer (nullable = true)
 |-- last_trade_change: integer (nullable = true)
 |-- last_trade_percent: float (nullable = true)
 |-- last_price_value: integer (nullable = true)
 |-- last_price_change: integer (nullable = true)
 |-- last_price_percent: float (nullable = true)
 |-- lowest: integer (nullable = true)
 |-- highest: integer (nullable = true)

+----+--------+-----------------------------+--------+---------+-----+---------+------+----------------+-----------------+------------------+----------------+-----------------+------------------+------+-------+
|date|symbol  |name                         |quantity|volume   |value|yesterday|first |last_trade_valu

#### Dataframe

To get the cheapest or priciest stocks, we order the table by 'last_price_value' and show the first 5 rows

In [None]:
# Highest price
df.orderBy('last_price_value', ascending = False).select('name', 'last_price_value').show(5, truncate=False)

# Lowest price
df.orderBy('last_price_value', ascending = True).select('name', 'last_price_value').show(5, truncate=False)

+-----------------------------+----------------+
|name                         |last_price_value|
+-----------------------------+----------------+
|مشاركت دولتي1-شرايط خاص001026|990000          |
|سپنتا                        |410750          |
|خوراك‌  دام‌ پارس‌           |400550          |
|صنايع‌جوشكاب‌يزد             |360970          |
|پتروشيمي فناوران             |327430          |
+-----------------------------+----------------+
only showing top 5 rows

+------------------------------+----------------+
|name                          |last_price_value|
+------------------------------+----------------+
|صندوق س. با درآمد ثابت كيان   |1               |
|صندوق س.اعتماد آفرين پارسيان-د|1               |
|صندوق س ياقوت آگاه-ثابت       |1               |
|صندوق س. با درآمد ثابت كمند   |1               |
|صندوق س نگين سامان-ثابت       |1               |
+------------------------------+----------------+
only showing top 5 rows



#### SQL

To execute SQL code we should create a temporary table from our dataframe.

The SQL code just orders by 'last_price_value' and shows first 5 results

In [None]:
# Create temporary table
df.registerTempTable('sqltable')

# max
newDF = spark.sql("""
  select name, last_price_value 
  from sqltable 
  order by last_price_value desc 
  limit 5
""")
newDF.show(truncate = False)

# min
newDF = spark.sql("""
  select name, last_price_value 
  from sqltable 
  order by last_price_value asc 
  limit 5
""")
newDF.show(truncate = False)

+-----------------------------+----------------+
|name                         |last_price_value|
+-----------------------------+----------------+
|مشاركت دولتي1-شرايط خاص001026|990000          |
|سپنتا                        |410750          |
|خوراك‌  دام‌ پارس‌           |400550          |
|صنايع‌جوشكاب‌يزد             |360970          |
|پتروشيمي فناوران             |327430          |
+-----------------------------+----------------+

+---------------------------+----------------+
|name                       |last_price_value|
+---------------------------+----------------+
|صندوق س. با درآمد ثابت كمند|1               |
|صندوق س. با درآمد ثابت كيان|1               |
|صندوق س. آرمان آتي كوثر-د  |1               |
|صندوق س ياقوت آگاه-ثابت    |1               |
|صندوق س نگين سامان-ثابت    |1               |
+---------------------------+----------------+



## Part 2



#### Read all CSVs

The results are for 2 months instead of 6 months as the question asks.

The same preprocessing is done, only this time on all of the files

In [None]:
from pyspark.sql.functions import col, split, substring, length, to_date, to_timestamp

# Read all files
df = spark.read.option("header", "true").csv("/content/drive/MyDrive/BD3/stock/*.csv")

# Preprocess
df = df.withColumn("symbol",col('نماد')).drop('نماد') \
    .withColumn("name", col('نام')).drop('نام') \
    .withColumn("quantity", col('تعداد').cast('int')).drop('تعداد') \
    .withColumn("volume", col('حجم').cast('int')).drop('حجم') \
    .withColumn("value", col('ارزش').cast('long')).drop('ارزش') \
    .withColumn("yesterday", col('دیروز').cast('int')).drop('دیروز') \
    .withColumn("first", col('اولین').cast('int')).drop('اولین') \
    .withColumn("last_trade_value", col('آخرین معامله - مقدار').cast('int')).drop('آخرین معامله - مقدار') \
    .withColumn("last_trade_change", col('آخرین معامله - تغییر').cast('int')).drop('آخرین معامله - تغییر') \
    .withColumn("last_trade_percent", col('آخرین معامله - درصد').cast('float')).drop('آخرین معامله - درصد') \
    .withColumn("last_price_value", col('قیمت پایانی - مقدار').cast('int')).drop('قیمت پایانی - مقدار') \
    .withColumn("last_price_change", col('قیمت پایانی - تغییر').cast('int')).drop('قیمت پایانی - تغییر') \
    .withColumn("last_price_percent", col('قیمت پایانی - درصد').cast('float')).drop('قیمت پایانی - درصد') \
    .withColumn("lowest", col('کمترین').cast('int')).drop('کمترین') \
    .withColumn("highest", col('بیشترین').cast('int')).drop('بیشترین') \
    .drop("_c0") \
    .withColumn("date", to_date('date', 'yyyy-MM-dd')) 

df.printSchema()
df.show(5, truncate = False)

root
 |-- date: date (nullable = true)
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- volume: integer (nullable = true)
 |-- value: long (nullable = true)
 |-- yesterday: integer (nullable = true)
 |-- first: integer (nullable = true)
 |-- last_trade_value: integer (nullable = true)
 |-- last_trade_change: integer (nullable = true)
 |-- last_trade_percent: float (nullable = true)
 |-- last_price_value: integer (nullable = true)
 |-- last_price_change: integer (nullable = true)
 |-- last_price_percent: float (nullable = true)
 |-- lowest: integer (nullable = true)
 |-- highest: integer (nullable = true)

+----------+--------+-----------------------------+--------+--------+-------------+---------+------+----------------+-----------------+------------------+----------------+-----------------+------------------+------+-------+
|date      |symbol  |name                         |quantity|volume  |value        |yesterday

#### DataFrame

Volumes are summed for each symbol and the biggest is shown

it is خساپا

In [None]:
df.groupBy('symbol').sum('volume') \
       .orderBy('sum(volume)', ascending=False) \
       .first()

Row(symbol='خساپا', sum(volume)=28093676706)

#### SQL

Volumes are summed, grouped by symbol and sorted by the sum.

Highest volume belongs to خساپا

In [None]:
# Create temporary table
df.registerTempTable('sqltable')

# max
newDF = spark.sql("""
  select symbol, sum(volume) as total_volume
  from sqltable 
  group by symbol
  order by total_volume desc
  limit 1
""")
newDF.show(truncate = False)

+------+------------+
|symbol|total_volume|
+------+------------+
|خساپا |28093676706 |
+------+------------+



## Part 3

#### DataFrame

In this question, we only need year, month, date, symbol and value. So at first, I only select these columns.

First, the dataframe is self-joined with all the records of each symbol that are ahead but in the same month. These conditions are met in filter() after the join. Then, the increase in value is calculated for each record and only maximums in each [month, symbol] group are kept. 

Then, because we want to show top 10 records for each month group, we need toa kind of partitioning by month and ordering by maximum increase value. So, a new row number column is created that represents the rank of each symbol in that month with respect to maximum increase.

At last, we only output records with row numbers not more than 10


In [None]:
from pyspark.sql.functions import year, month, row_number
from pyspark.sql.window import Window

# Keep only columns that we need
df_tmp = df.select(year('date').alias('year'), 
          month('date').alias('month'), 
          'date', 
          'symbol',
          'value')

# should only show first 10 records for each month!  use Window function:
windowSpec  = Window.partitionBy("year", 'month').orderBy(col("max(increase)").desc())

# Self join with all records of each symbol that are ahead but in the same month
# new column: increase amount  
# get maximum increase for each symbol for each month
# ascending order by time, descending order by maximum increase value
df_tmp = df_tmp.alias('df_1').join(df_tmp.alias('df_2')) \
                              .filter((col('df_1.date') < col('df_2.date'))
                                  & (col("df_1.symbol") == col("df_2.symbol"))
                                  & (col("df_1.month") == col("df_2.month"))
                                  & (col("df_1.year") == col("df_2.year"))) \
                              .withColumn("increase", col('df_2.value') - col('df_1.value')) \
                              .groupBy(col('df_1.year'), col('df_1.month'), col('df_1.symbol')) \
                              .max("increase") \
                              .sort(col('year'), col('month')) \
                              .withColumn("row_number",row_number().over(windowSpec)) \
                              .filter(col('row_number') < 11) \
                              .show(truncate=False)


+----+-----+--------+--------------+----------+
|year|month|symbol  |max(increase) |row_number|
+----+-----+--------+--------------+----------+
|1399|11   |پالايش  |16759618165030|1         |
|1399|11   |دارا يكم|12175656560440|2         |
|1399|11   |شستا    |9800328189440 |3         |
|1399|11   |شپنا    |8729919981050 |4         |
|1399|11   |اوصتا4  |7749230000000 |5         |
|1399|11   |خساپا   |7685442557110 |6         |
|1399|11   |امين يكم|7125258897273 |7         |
|1399|11   |فولاد2  |6700200000000 |8         |
|1399|11   |فملي    |5283092056990 |9         |
|1399|11   |وتجارت  |4361376265020 |10        |
|1399|12   |شپنا4   |16817486114140|1         |
|1399|12   |پالايش  |11624833080670|2         |
|1399|12   |وغدير2  |7955327200000 |3         |
|1399|12   |اپال    |7889415735420 |4         |
|1399|12   |دارا يكم|7450902080070 |5         |
|1399|12   |بركت    |6551245134860 |6         |
|1399|12   |امين يكم|6135563904277 |7         |
|1399|12   |كمند    |5070727437865 |8   

#### SQL

Similar to the previous code, each record is joined with records of same symbol that are ahead but in the same month and maximum increase of value for each month and symbol is found.

In order to only show top 10 increases in each month, a row number is created and only records with row number not bigger than 10 are shown in output

In [None]:
# Create temporary table
df.registerTempTable('sqltable')

newDF = spark.sql("""
with stock as (
select  year(s1.date) as year, month(s1.date) as month, s1.symbol as symbol, max(s2.value - s1.value) as maximum_increase
from sqltable as s1 join sqltable as s2
where s1.symbol == s2.symbol and s1.date < s2.date and month(s1.date) == month(s1.date) and year(s1.date) == year(s1.date)
group by year(s1.date), month(s1.date), s1.symbol
order by year asc, month asc
),
stock2 as 
(select *, ROW_NUMBER() over(partition by year, month order by maximum_increase desc) as rnn
from stock
)
select * 
from stock2
where rnn <= 10
""")

newDF.show(truncate = False)

+----+-----+--------+----------------+---+
|year|month|symbol  |maximum_increase|rnn|
+----+-----+--------+----------------+---+
|1399|11   |پالايش  |16759618165030  |1  |
|1399|11   |دارا يكم|12175656560440  |2  |
|1399|11   |امين يكم|9861407653066   |3  |
|1399|11   |شستا    |9800328189440   |4  |
|1399|11   |شپنا    |8729919981050   |5  |
|1399|11   |اوصتا4  |7749230000000   |6  |
|1399|11   |خساپا   |7685442557110   |7  |
|1399|11   |فولاد2  |6700200000000   |8  |
|1399|11   |بركت    |6452484770120   |9  |
|1399|11   |فملي    |5283092056990   |10 |
|1399|12   |شپنا4   |16817486114140  |1  |
|1399|12   |پالايش  |11624833080670  |2  |
|1399|12   |وغدير2  |7955327200000   |3  |
|1399|12   |اپال    |7889415735420   |4  |
|1399|12   |دارا يكم|7450902080070   |5  |
|1399|12   |بركت    |6551245134860   |6  |
|1399|12   |امين يكم|6135563904277   |7  |
|1399|12   |كمند    |5070727437865   |8  |
|1399|12   |شپنا    |4854738538410   |9  |
|1399|12   |اوصتا4  |4694149900000   |10 |
+----+-----

## Part 4

#### DataFrame

Similar to the last question, we self-join the dataframe with records of the same symbol that are ahead (any month) and keep the price decreases in a new column. There is a new constraint on the join. By using 'yesterday' column, we make sure the price has not increased from yesterday to today! (ریزش) 

Then we find the maximum decrease of each symbol and show the results in descending order



In [None]:
from pyspark.sql.functions import year, month, row_number
from pyspark.sql.window import Window

# we should check price of the day before and make sure it has not increased!
# Keep only columns that we need
df_tmp = df.select('date', 'symbol', 'last_price_value', 'yesterday')

# Self join with all records of each symbol that are ahead (to calculate max decrease)
# add condition: value has not increased from yesterday
df_tmp = df_tmp.alias('df_1').join(df_tmp.alias('df_2')) \
                              .filter((col('df_1.date') < col('df_2.date'))
                                  & (col("df_1.symbol") == col("df_2.symbol"))
                                  & (col("df_1.yesterday") >= col("df_1.last_price_value"))) \
                              .withColumn("decrease", col('df_1.last_price_value') - col('df_2.last_price_value')) \
                              .groupBy(col('df_1.symbol')) \
                              .max("decrease") \
                              .orderBy(col("max(decrease)").desc()) \
                              .show(truncate=False)

+--------+-------------+
|symbol  |max(decrease)|
+--------+-------------+
|سصفها   |276820       |
|انرژي3  |120460       |
|بكاب    |93710        |
|فنورد   |75840        |
|غدام    |69180        |
|فپنتا   |66670        |
|فمراد   |53530        |
|شسينا   |51790        |
|غچين    |50630        |
|وملي    |47490        |
|سكارون  |45109        |
|كدما    |45020        |
|تكنار   |42194        |
|دشيري   |42084        |
|گكيش    |41150        |
|خفولا   |40193        |
|واحيا   |37647        |
|زبينا   |37304        |
|دارا يكم|36810        |
|وهور    |36754        |
+--------+-------------+
only showing top 20 rows



#### SQL

Very similar to last question. However, a new constraint in WHERE clause is added to make sure price of yesterday is bigger than today's price (ریزش) and maximum decrease in found for each symbol and sorted.

In [None]:
# Create temporary table
df.registerTempTable('sqltable')

newDF = spark.sql("""
select s1.symbol, max(s1.last_price_value - s2.last_price_value) as max_decrease
from sqltable as s1 join sqltable as s2
where s1.symbol == s2.symbol and s1.date < s2.date and s1.yesterday >= s1.last_price_value
group by s1.symbol
order by max_decrease desc
limit 20
""")

newDF.show(truncate = False)


+--------+------------+
|symbol  |max_decrease|
+--------+------------+
|سصفها   |276820      |
|انرژي3  |120460      |
|بكاب    |93710       |
|فنورد   |75840       |
|غدام    |69180       |
|فپنتا   |66670       |
|فمراد   |53530       |
|شسينا   |51790       |
|غچين    |50630       |
|وملي    |47490       |
|سكارون  |45109       |
|كدما    |45020       |
|تكنار   |42194       |
|دشيري   |42084       |
|گكيش    |41150       |
|خفولا   |40193       |
|واحيا   |37647       |
|زبينا   |37304       |
|دارا يكم|36810       |
|وهور    |36754       |
+--------+------------+



## Part 5

#### DataFrame

We should count the records of each symbol where volume is not zero. This gives us number of days that each symbol has open.

We sort the counts in ascending order and find the stocks with least open days

In [None]:
from pyspark.sql.functions import year, month, row_number
from pyspark.sql.window import Window

df.select('symbol', 'volume') \
  .filter(col('volume') != 0) \
  .groupBy('symbol') \
  .count() \
  .withColumn('days_open', col('count')).drop('count') \
  .orderBy(col('days_open').asc(), col('symbol').asc()) \
  .show()

+-------+---------+
| symbol|days_open|
+-------+---------+
|  آريا2|        1|
|  آسام2|        1|
|  آسام4|        1|
| آينده4|        1|
| اتكام2|        1|
|  ارزش2|        1|
| استارز|        1|
|  افاد4|        1|
|   افق2|        1|
|  اكالا|        1|
|  اميد2|        1|
|  امين4|        1|
| انرژي1|        1|
| انرژي2|        1|
|  اوان2|        1|
|  اپال2|        1|
|  اپال4|        1|
|اپرداز2|        1|
|بترانس2|        1|
|بزاگرس4|        1|
+-------+---------+
only showing top 20 rows



#### SQL

Nothing new here

In [None]:
# Create temporary table
df.registerTempTable('sqltable')

newDF = spark.sql("""
select symbol, count(volume) as days_open
from sqltable 
where volume <> 0
group by symbol 
order by days_open asc, symbol asc
LIMIT 20;
""")

newDF.show(truncate = False)S

+-------+---------+
|symbol |days_open|
+-------+---------+
|آريا2  |1        |
|آسام2  |1        |
|آسام4  |1        |
|آينده4 |1        |
|اتكام2 |1        |
|ارزش2  |1        |
|استارز |1        |
|افاد4  |1        |
|افق2   |1        |
|اكالا  |1        |
|اميد2  |1        |
|امين4  |1        |
|انرژي1 |1        |
|انرژي2 |1        |
|اوان2  |1        |
|اپال2  |1        |
|اپال4  |1        |
|اپرداز2|1        |
|بترانس2|1        |
|بزاگرس4|1        |
+-------+---------+

