<a href="https://colab.research.google.com/github/sbardaoui/katacoda-scenarios/blob/main/Untitled4.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Installing required tools to use spark SQL**

In [79]:
!pip install pyspark
!pip install pyprind



**Adding the needed libraries to access our spark data base and to manage our data**

In [80]:
from pyspark.sql import SparkSession,Row
import pandas as pd
import json
import numpy as np
import pandas as pd
from scipy import stats
from copy import copy
import pyprind
import matplotlib.pyplot as plt
import seaborn as sns


spark = SparkSession \
.builder \
.appName("PySpark Create ") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()  

**Loading the json file to get a general view and test our spark**

In [81]:
df = spark.read.json("stocks.json")
df.show()

+----------------------------+-----------------------------+-----------------+------------------+-------------+------------------+--------------+----+-------+--------------------+-------+--------------------+------+------+--------------------+--------------------+--------------------+------------+
|20-Day Simple Moving Average|200-Day Simple Moving Average|           50-Day|           52-Week|Analyst Recom|Average True Range|Average Volume|Beta| Change|             Company|EPS ttm|       Earnings Date| Price|   ROI|                 _id|         description|         performance|       ratio|
+----------------------------+-----------------------------+-----------------+------------------+-------------+------------------+--------------+----+-------+--------------------+-------+--------------------+------+------+--------------------+--------------------+--------------------+------------+
|                     -0.0172|                       0.1062|[0.0728, -0.0544]| [0.4378, -0.0544]|      

Viewing the schema

In [82]:
df.printSchema()

root
 |-- 20-Day Simple Moving Average: double (nullable = true)
 |-- 200-Day Simple Moving Average: double (nullable = true)
 |-- 50-Day: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- 52-Week: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- Analyst Recom: double (nullable = true)
 |-- Average True Range: double (nullable = true)
 |-- Average Volume: double (nullable = true)
 |-- Beta: double (nullable = true)
 |-- Change: double (nullable = true)
 |-- Company: string (nullable = true)
 |-- EPS ttm: double (nullable = true)
 |-- Earnings Date: struct (nullable = true)
 |    |-- $date: string (nullable = true)
 |-- Price: double (nullable = true)
 |-- ROI: double (nullable = true)
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- description: struct (nullable = true)
 |    |-- Country: string (nullable = true)
 |    |-- Industry: string (nullable = true)
 |    |-- Sector: string (nullable = true)


**Testing a first "select" using spark**

In [83]:
df.select("company").show()

+--------------------+
|             company|
+--------------------+
|Agilent Technolog...|
|         Alcoa, Inc.|
|WCM/BNY Mellon Fo...|
|iShares MSCI AC A...|
|Altisource Asset ...|
|Atlantic American...|
|       Aaron's, Inc.|
|Applied Optoelect...|
|           AAON Inc.|
|Advance Auto Part...|
|          Apple Inc.|
|American Assets T...|
|Almaden Minerals ...|
|Advantage Oil & G...|
|Atlas Air Worldwi...|
|iShares MSCI All ...|
|AllianceBernstein...|
|         Abaxis Inc.|
|            ABB Ltd.|
|         AbbVie Inc.|
+--------------------+
only showing top 20 rows



# Manipulating Dataset

**We will begin by modifying our Data using python to clean the JSON file**

In [84]:
df_raw = pd.read_json("stocks.json", lines=True)
df_raw.head(2)

Unnamed: 0,_id,Company,Price,Earnings Date,description,20-Day Simple Moving Average,200-Day Simple Moving Average,50-Day,52-Week,Analyst Recom,Average True Range,Average Volume,Beta,Change,EPS ttm,ROI,ratio,performance
0,{'$oid': '52853800bb1177ca391c17ff'},Agilent Technologies Inc.,50.44,{'$date': '2013-11-14T21:30:00.000+0000'},"{'Country': 'USA', 'Sector': 'Healthcare', 'In...",-0.0172,0.1062,"[0.0728, -0.054400000000000004]","[0.4378, -0.054400000000000004]",1.6,0.86,2569.36,1.5,-0.0148,2.68,0.163,"{'quick': 2.3, 'current': 3}","{'Year': 0.4242, 'Half Year': 0.1439, 'Month':..."
1,{'$oid': '52853800bb1177ca391c1800'},"Alcoa, Inc.",9.02,{'$date': '2013-10-08T20:30:00.000+0000'},"{'Country': 'USA', 'Sector': 'Basic Materials'...",-0.0192,0.0823,"[0.1579, -0.0925]","[0.1899, -0.0925]",3.1,0.3,26728.11,2.02,0.0033,0.25,0.007,"{'quick': 0.7000000000000001, 'current': 1.2}","{'Year': 0.09630000000000001, 'Half Year': 0.0..."


Viewing the dataset shape and finding missing data

In [85]:
def resumetable(df):
    print(f"Dataset Shape: {df.shape}")
    summary = pd.DataFrame(df.dtypes,columns=['dtypes'])
    summary = summary.reset_index()
    summary['Name'] = summary['index']
    summary = summary[['Name','dtypes']]
    summary['Missing'] = df.isnull().sum().values    
    summary['First Value'] = df.loc[0].values
    summary['Second Value'] = df.loc[1].values
    summary['Third Value'] = df.loc[2].values

    for name in summary['Name'].value_counts().index:
        summary.loc[summary['Name'] == name, 'Entropy'] = round(stats.entropy(df[name].value_counts(normalize=True), base=2),2) 

    return summary

resumetable(df_raw)

Dataset Shape: (6756, 18)


Unnamed: 0,Name,dtypes,Missing,First Value,Second Value,Third Value,Entropy
0,_id,object,0,{'$oid': '52853800bb1177ca391c17ff'},{'$oid': '52853800bb1177ca391c1800'},{'$oid': '52853800bb1177ca391c1801'},12.72
1,Company,object,0,Agilent Technologies Inc.,"Alcoa, Inc.",WCM/BNY Mellon Focused Growth ADR ETF,12.72
2,Price,float64,13,50.44,9.02,36.4,11.86
3,Earnings Date,object,2198,{'$date': '2013-11-14T21:30:00.000+0000'},{'$date': '2013-10-08T20:30:00.000+0000'},,7.25
4,description,object,0,"{'Country': 'USA', 'Sector': 'Healthcare', 'In...","{'Country': 'USA', 'Sector': 'Basic Materials'...","{'Country': 'USA', 'Sector': 'Financial', 'Ind...",6.89
5,20-Day Simple Moving Average,float64,14,-0.0172,-0.0192,-0.0054,10.57
6,200-Day Simple Moving Average,float64,14,0.1062,0.0823,0.0693,11.78
7,50-Day,object,0,"[0.0728, -0.054400000000000004]","[0.1579, -0.0925]","[0.0792, -0.0194]",12.71
8,52-Week,object,0,"[0.4378, -0.054400000000000004]","[0.1899, -0.0925]","[0.2727, -0.0194]",12.71
9,Analyst Recom,float64,2591,1.6,3.1,,4.26


In [86]:
for col in df_raw:
    print(col)
    print(type(df_raw[col].values[0]))
    print("---------------")

_id
<class 'dict'>
---------------
Company
<class 'str'>
---------------
Price
<class 'numpy.float64'>
---------------
Earnings Date
<class 'dict'>
---------------
description
<class 'dict'>
---------------
20-Day Simple Moving Average
<class 'numpy.float64'>
---------------
200-Day Simple Moving Average
<class 'numpy.float64'>
---------------
50-Day
<class 'list'>
---------------
52-Week
<class 'list'>
---------------
Analyst Recom
<class 'numpy.float64'>
---------------
Average True Range
<class 'numpy.float64'>
---------------
Average Volume
<class 'numpy.float64'>
---------------
Beta
<class 'numpy.float64'>
---------------
Change
<class 'numpy.float64'>
---------------
EPS ttm
<class 'numpy.float64'>
---------------
ROI
<class 'numpy.float64'>
---------------
ratio
<class 'dict'>
---------------
performance
<class 'dict'>
---------------


In [87]:
_id_column = df_raw["_id"].apply(lambda x: x.get('$oid'))
# np.datetime64
_id_column

0       52853800bb1177ca391c17ff
1       52853800bb1177ca391c1800
2       52853800bb1177ca391c1801
3       52853800bb1177ca391c1802
4       52853800bb1177ca391c1803
                  ...           
6751    52853810bb1177ca391c325e
6752    52853810bb1177ca391c325f
6753    52853810bb1177ca391c3260
6754    52853810bb1177ca391c3261
6755    52853810bb1177ca391c3262
Name: _id, Length: 6756, dtype: object

In [88]:
date_column = df_raw["Earnings Date"].apply(lambda x: np.nan if x is None else x.get('$date'))
date_column = pd.to_datetime(date_column)
date_column

0      2013-11-14 21:30:00+00:00
1      2013-10-08 20:30:00+00:00
2                            NaT
3                            NaT
4                            NaT
                  ...           
6751                         NaT
6752                         NaT
6753   2013-11-05 13:30:00+00:00
6754   2013-12-05 21:30:00+00:00
6755   2013-11-20 13:30:00+00:00
Name: Earnings Date, Length: 6756, dtype: datetime64[ns, UTC]

In [89]:
desc_column = df_raw["description"].apply(
    lambda x: x if not None else {"Country": None, "Sector": None, "Industry": None})
desc_column = desc_column.fillna("null")

In [90]:
perf_column = df_raw["performance"].apply(
    lambda x: x if not None else {"Year": None, "Half Year": None, "Month": None, "Week": None})
perf_column = perf_column.fillna(0.0)

In [91]:
ratio_column = df_raw["ratio"].apply(
    lambda x: x if not None else {"quick": None, "current": None})
ratio_column = ratio_column.fillna(0.0)

In [92]:
df_clean = df_raw
df_clean["_id"] = _id_column
df_clean["Earnings Date"] = date_column
df_clean["description"] = desc_column
df_clean["performance"] = perf_column
df_clean["ratio"] = ratio_column
df_clean.columns = ["id", "company", "price", "earningsdate", 
"description", "SMA20", "SMA200", "days50highlow", 
"weeks52highlow", "analyst_recommendation", 
"avg_true_rate", "avg_vol", "beta", "change", 
"EPSttm", "ROI", "ratio", "performance"]

df_clean.head(2)

Unnamed: 0,id,company,price,earningsdate,description,SMA20,SMA200,days50highlow,weeks52highlow,analyst_recommendation,avg_true_rate,avg_vol,beta,change,EPSttm,ROI,ratio,performance
0,52853800bb1177ca391c17ff,Agilent Technologies Inc.,50.44,2013-11-14 21:30:00+00:00,"{'Country': 'USA', 'Sector': 'Healthcare', 'In...",-0.0172,0.1062,"[0.0728, -0.054400000000000004]","[0.4378, -0.054400000000000004]",1.6,0.86,2569.36,1.5,-0.0148,2.68,0.163,"{'quick': 2.3, 'current': 3}","{'Year': 0.4242, 'Half Year': 0.1439, 'Month':..."
1,52853800bb1177ca391c1800,"Alcoa, Inc.",9.02,2013-10-08 20:30:00+00:00,"{'Country': 'USA', 'Sector': 'Basic Materials'...",-0.0192,0.0823,"[0.1579, -0.0925]","[0.1899, -0.0925]",3.1,0.3,26728.11,2.02,0.0033,0.25,0.007,"{'quick': 0.7000000000000001, 'current': 1.2}","{'Year': 0.09630000000000001, 'Half Year': 0.0..."


Exporting into a CSV file

In [93]:
df_clean.to_csv("stocks_clean.csv", index=False)

Exporting into a JSON file

In [94]:
df_clean.to_json("stocks_clean.json", orient='records')

**Partie 1 req simple**

Selecting a specific company details

In [95]:
df = spark.read.json("stocks_clean.json")
df.registerTempTable("stocks")
spark.sql("Select id, company, price from stocks where company = 'AbbVie Inc.' ").show()



+--------------------+-----------+-----+
|                  id|    company|price|
+--------------------+-----------+-----+
|52853800bb1177ca3...|AbbVie Inc.|48.11|
+--------------------+-----------+-----+



Counting all rows

In [96]:
spark.sql("select count('*') from stocks").show()

+--------+
|count(*)|
+--------+
|    6756|
+--------+



Selecting with a defined ID

In [97]:
spark.sql("select * from stocks where id='5285380dbb1177ca391c2f9a'").show()

+------+------+-------+------+----------------------+-------------+--------+----+-------+------------------+-----------------+--------------------+-------------+--------------------+--------------------+------+----------+-----------------+
|EPSttm|   ROI|  SMA20|SMA200|analyst_recommendation|avg_true_rate| avg_vol|beta| change|           company|    days50highlow|         description| earningsdate|                  id|         performance| price|     ratio|   weeks52highlow|
+------+------+-------+------+----------------------+-------------+--------+----+-------+------------------+-----------------+--------------------+-------------+--------------------+--------------------+------+----------+-----------------+
| -1.36|-0.667|-0.1504|0.3027|                   2.5|        10.13|11357.89|0.39|-0.0112|Tesla Motors, Inc.|[0.0364, -0.2949]|{USA, Auto Manufa...|1383687000000|5285380dbb1177ca3...|{0.6348, -0.2459,...|137.14|{2.3, 1.8}|[3.4964, -0.2949]|
+------+------+-------+------+----------