### Installing Spark

Install Dependencies:


1.   Java 8
2.   Apache Spark with hadoop
3.   Findspark (used to locate the spark in the system)


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz

--2021-06-13 07:01:49--  http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
Resolving archive.apache.org (archive.apache.org)... 138.201.131.134, 2a01:4f8:172:2ec5::2
Connecting to archive.apache.org (archive.apache.org)|138.201.131.134|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 228721937 (218M) [application/x-gzip]
Saving to: ‘spark-3.1.1-bin-hadoop3.2.tgz’


2021-06-13 07:02:02 (17.3 MB/s) - ‘spark-3.1.1-bin-hadoop3.2.tgz’ saved [228721937/228721937]



In [3]:
!pip install -q findspark

Set Environment Variables:

In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [5]:
import findspark

findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
# Property used to format output tables better
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

### Load Dataset

Download stock data from tsetmc site
<br />
**NOTE**: below code must run on server with iran ip address
<br />
I run script in local system and then upload result to google drive manualy

In [None]:
import requests
import io
import re

NAME_RE = re.compile(r"^[\u0600-\u06FF\s]+[0-9]?$")

import pandas as pd

for month in range(10, 13):
    for day in range(1, 32):
        # donwload xlsx file
        date = '1399-%02d-%02d' % (month, day)
        headers = {'user-agent': 'Chrome/61.0'}
        query = {'d': date}
        url = 'http://members.tsetmc.com/tsev2/excel/MarketWatchPlus.aspx'
        r = requests.get(url, params=query, headers=headers, allow_redirects=True)
        if len(r.content) > 10240:
            # convert xlsx to csv
            with io.BytesIO(r.content) as fh:
                df = pd.io.excel.read_excel(fh, index_col=0, header=2)

                # filter symbol name
                idx = [bool(re.match(NAME_RE, symbol)) for symbol in df.index]
                df = df[idx]
                df.index.names = ['symbol']
                df.columns = ['name', 'count', 'capacity', 'value',
              		       'yesterday', 'first',
              		       'last_deal', 'last_deal - diff', 'last_deal - percent',
              		       'final_price', 'final_price - diff', 'final_price - percent',
              		       'min', 'max']
                # add trade date column
                df['trade_date'] = '1399-%02d-%02d' % (month, day)
                df['year'] = 1399
                df['month'] = month
                df['day'] = day
                filename = './stocks/1399-%02d-%02d.csv' % (month, day)
                df.to_csv(filename)
                print('%s --> done' % filename)

Mount google drive

In [6]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [7]:
#check stock files
!ls /gdrive/MyDrive/stocks

1399-10-01.csv	1399-10-20.csv	1399-11-07.csv	1399-11-27.csv	1399-12-16.csv
1399-10-02.csv	1399-10-21.csv	1399-11-08.csv	1399-11-28.csv	1399-12-17.csv
1399-10-03.csv	1399-10-22.csv	1399-11-11.csv	1399-11-29.csv	1399-12-18.csv
1399-10-06.csv	1399-10-23.csv	1399-11-12.csv	1399-12-02.csv	1399-12-19.csv
1399-10-07.csv	1399-10-24.csv	1399-11-13.csv	1399-12-03.csv	1399-12-20.csv
1399-10-08.csv	1399-10-27.csv	1399-11-14.csv	1399-12-04.csv	1399-12-23.csv
1399-10-09.csv	1399-10-29.csv	1399-11-15.csv	1399-12-05.csv	1399-12-24.csv
1399-10-10.csv	1399-10-30.csv	1399-11-18.csv	1399-12-06.csv	1399-12-25.csv
1399-10-13.csv	1399-10-31.csv	1399-11-19.csv	1399-12-09.csv	1399-12-26.csv
1399-10-14.csv	1399-11-01.csv	1399-11-20.csv	1399-12-10.csv	1399-12-27.csv
1399-10-15.csv	1399-11-04.csv	1399-11-21.csv	1399-12-11.csv
1399-10-16.csv	1399-11-05.csv	1399-11-25.csv	1399-12-12.csv
1399-10-17.csv	1399-11-06.csv	1399-11-26.csv	1399-12-13.csv


In [8]:
from pyspark.sql.functions import col
from pyspark.sql.functions import to_date

# Load data into dataframe for use in Spark Dataframes
df = spark.read.csv('/gdrive/MyDrive/stocks/*.csv', header=True)

# fix columns type
df = df.withColumn('capacity', col('capacity').cast('int'))
df = df.withColumn('final_price - diff', col('final_price - diff').cast('int'))

df = df.withColumn('trade_date', to_date(df.trade_date, 'yyyy-MM-dd'))
df = df.withColumn('year', col('year').cast('int'))
df = df.withColumn('month', col('month').cast('int'))
df = df.withColumn('day', col('day').cast('int'))

df.printSchema()

root
 |-- symbol: string (nullable = true)
 |-- name: string (nullable = true)
 |-- count: string (nullable = true)
 |-- capacity: integer (nullable = true)
 |-- value: string (nullable = true)
 |-- yesterday: string (nullable = true)
 |-- first: string (nullable = true)
 |-- last_deal: string (nullable = true)
 |-- last_deal - diff: string (nullable = true)
 |-- last_deal - percent: string (nullable = true)
 |-- final_price: string (nullable = true)
 |-- final_price - diff: integer (nullable = true)
 |-- final_price - percent: string (nullable = true)
 |-- min: string (nullable = true)
 |-- max: string (nullable = true)
 |-- trade_date: date (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)



In [9]:
# Register Temporary Table for using in Spark SQL
df.createOrReplaceTempView('stock')

### Q1
Find top 10 most\least expensive symbols in last day (1399-12-27)

#### Spark Dataframes

In [10]:
from pyspark.sql.functions import lit

# most expensive
res_df = df.filter(df.trade_date == lit("1399-12-27")).select('symbol', 'final_price')
res_df.orderBy('final_price', ascending=False).limit(10)

symbol,final_price
صايند,9939
شگويا,9919
اشاد1,990000
شپلي,9885
وتوسم2,9800
سحرخيز,9762
شتران,9730
وتوسم,9700
لپارس,95940
پترولح,9560


In [11]:
from pyspark.sql.functions import lit

# least expensive
res_df = df.filter(df.trade_date == lit("1399-12-27")).select('symbol', 'final_price')
res_df.orderBy('final_price').limit(10)

symbol,final_price
كمند2,1
كيان2,1
كارين2,1
ياقوت2,1
امين يكم2,1
نهال2,1
آگاس2,1
اعتماد2,1
وبازار2,1
آكورد2,1


#### Spark SQL

In [12]:
# most expensive
spark.sql(
    '''
    select symbol, final_price from stock where trade_date="1399-12-27"
    order by final_price desc limit 10
    '''
)

symbol,final_price
صايند,9939
شگويا,9919
اشاد1,990000
شپلي,9885
وتوسم2,9800
سحرخيز,9762
شتران,9730
وتوسم,9700
لپارس,95940
پترولح,9560


In [13]:
# least expensive
spark.sql(
    '''
    select symbol, final_price from stock where trade_date="1399-12-27"
    order by final_price asc limit 10
    '''
)

symbol,final_price
كمند2,1
كيان2,1
كارين2,1
ياقوت2,1
امين يكم2,1
نهال2,1
آگاس2,1
اعتماد2,1
وبازار2,1
آكورد2,1


### Q2
Find most traded symbol in total

#### Spark Dataframes

In [14]:
res_df = df.groupBy('symbol').sum('capacity')
res_df.orderBy('sum(capacity)', ascending=False).limit(1)

symbol,sum(capacity)
خساپا,48072085105


#### Spark SQL

In [15]:
spark.sql(
    '''
    select symbol, sum(capacity) as total_capacity from stock
    group by symbol
    order by total_capacity desc limit 1
    '''
)

symbol,total_capacity
خساپا,48072085105


### Q3
Find top 10 symbols with most raise in price per each month

#### Spark Dataframes

In [16]:
from pyspark.sql.functions import row_number
from pyspark.sql.window import Window

res_df = df.groupBy('symbol', 'month').sum('final_price - diff')

res_df.withColumn('rank', 
                  row_number()
                  .over(Window.partitionBy('month').orderBy(col("sum(final_price - diff)").desc()))
).filter(col('rank')<=10)

symbol,month,sum(final_price - diff),rank
عپلي جم,12,135483,1
رويش,12,80698,2
گندم2,12,54000,3
رافزا,12,50265,4
غمهراح,12,45750,5
شفن,12,38690,6
زاگرس,12,37665,7
عكاوه2,12,34498,8
نمرينو,12,34220,9
شصفها4,12,31144,10


#### Spark SQL

In [17]:
spark.sql(
    '''
    select * from (
      select *, (rank() over (partition by month order by total_change desc)) rank
      from (
        select symbol, month, sum(`final_price - diff`) as total_change from stock
        group by symbol, month
      )
    )
    where rank<=10
    '''
)

symbol,month,total_change,rank
عپلي جم,12,135483,1
رويش,12,80698,2
گندم2,12,54000,3
رافزا,12,50265,4
غمهراح,12,45750,5
شفن,12,38690,6
زاگرس,12,37665,7
عكاوه2,12,34498,8
نمرينو,12,34220,9
شصفها4,12,31144,10


### Q4
Find top 10 symbols with falling price in total

#### Spark Dataframes


In [18]:
res_df = df.groupBy('symbol').sum('final_price - diff')
res_df.orderBy('sum(final_price - diff)').limit(10)

symbol,sum(final_price - diff)
اطلس2,-10768859
فيروزه2,-2882302
آگاس2,-2825522
سرو2,-1773884
كيان2,-1538564
آكورد2,-1433662
عيار2,-1350706
طلا2,-1171043
گوهر2,-911593
اعتماد2,-765130


#### Spark SQL


In [19]:
spark.sql(
    '''
    select symbol, sum(`final_price - diff`) as total_change from stock
    group by symbol
    order by total_change asc limit 10
    '''
)

symbol,total_change
اطلس2,-10768859
فيروزه2,-2882302
آگاس2,-2825522
سرو2,-1773884
كيان2,-1538564
آكورد2,-1433662
عيار2,-1350706
طلا2,-1171043
گوهر2,-911593
اعتماد2,-765130


### Q5
Find most closed symbol in total

#### Spark Dataframes

In [20]:
total_days = df.select('trade_date').distinct().count()

res_df = df.filter(col('capacity')!=0).groupBy('symbol').count()
res_df = res_df.select('symbol', (total_days-col('count')).alias('closed_days'))
res_df.orderBy('closed_days', ascending=False).limit(1)

symbol,closed_days
فملي4,61


#### Spark SQL

In [21]:
spark.sql(
    '''
    select symbol, (62-count(*)) as closed_day from stock
    where capacity!=0
    group by symbol
    order by closed_day desc limit 1
    '''
)

symbol,closed_day
فملي4,61
