<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Beginners-Guide-to-PySpark" data-toc-modified-id="Beginners-Guide-to-PySpark-1"><span class="toc-item-num">1&nbsp;&nbsp;</span><font color="tomato">Beginners Guide to PySpark</font></a></span><ul class="toc-item"><li><span><a href="#Reading-Data" data-toc-modified-id="Reading-Data-1.1"><span class="toc-item-num">1.1&nbsp;&nbsp;</span>Reading Data</a></span><ul class="toc-item"><li><span><a href="#Download-Kaggle-Movie-Dataset" data-toc-modified-id="Download-Kaggle-Movie-Dataset-1.1.1"><span class="toc-item-num">1.1.1&nbsp;&nbsp;</span>Download Kaggle Movie Dataset</a></span></li></ul></li><li><span><a href="#Import-Modules" data-toc-modified-id="Import-Modules-1.2"><span class="toc-item-num">1.2&nbsp;&nbsp;</span>Import Modules</a></span></li><li><span><a href="#Read-Data" data-toc-modified-id="Read-Data-1.3"><span class="toc-item-num">1.3&nbsp;&nbsp;</span>Read Data</a></span></li><li><span><a href="#Inspect-the-data" data-toc-modified-id="Inspect-the-data-1.4"><span class="toc-item-num">1.4&nbsp;&nbsp;</span>Inspect the data</a></span></li><li><span><a href="#Column-Operations/Manipulations" data-toc-modified-id="Column-Operations/Manipulations-1.5"><span class="toc-item-num">1.5&nbsp;&nbsp;</span>Column Operations/Manipulations</a></span><ul class="toc-item"><li><span><a href="#How-to-use-Aggregation" data-toc-modified-id="How-to-use-Aggregation-1.5.1"><span class="toc-item-num">1.5.1&nbsp;&nbsp;</span>How to use Aggregation</a></span></li></ul></li></ul></li></ul></div>

# <font color='tomato'>Beginners Guide to PySpark</font>

https://towardsdatascience.com/beginners-guide-to-pyspark-bbe3b553b79f

https://github.com/syamkakarla98/Beginners_Guide_to_PySpark

In [None]:
!pip install pyspark  

Create a spark session

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

## Reading Data

### Download Kaggle Movie Dataset

Use the Kaggle API Token(kaggle.json) to download the Movie Dataset

In [54]:
from google.colab import files

## Upload your kaggle json file (API Token)
files.upload()

!mkdir ~/.kaggle

!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle (1).json
mkdir: cannot create directory ‘/root/.kaggle’: File exists


In [None]:
!kaggle datasets download -d dinnymathew/usstockprices

In [None]:
!ls

In [None]:
!mkdir data

!unzip usstockprices -d data

In [None]:
!ls -l data/

## Import Modules

In [9]:
from pyspark.sql import functions as f

import pandas as pd

import seaborn as sns

import matplotlib.pyplot as plt

%matplotlib inline

## Read Data

In [None]:
# Before changing schema
b_data = spark.read.csv(
    'data/stocks_price_final.csv',
    sep = ',',
    header = True,
    )

b_data.printSchema()

Below - - 

Spark schema is the structure of the DataFrame or Dataset, we can define it using StructType class which is a collection of StructField that defines the column name(String), column type (DataType), nullable column (Boolean), and metadata (MetaData). spark infers the schema from data however some times the inferred datatype may not be correct or we may need to define our own column names and data types, especially while working with unstructured and semi-structured data.

The below code shows how to create structure using StructTypeand StructField. Then pass the created structure to the schema parameter while reading the data using spark.read.csv()

In [11]:
from pyspark.sql.types import *

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields=data_schema)

In [12]:
data = spark.read.csv(
    'data/stocks_price_final.csv',
    sep = ',',
    header = True,
    schema = final_struc
    )

In [None]:
data.printSchema()

In [None]:
data.show(5)

In [15]:
data = data.withColumnRenamed('market.cap', 'market_cap')

## Inspect the data

In [None]:
# prints Schema of thte data
data.schema

In [None]:
data.dtypes

In [None]:
data.head(3)

In [None]:
data.show(5)

In [None]:
data.first()

In [None]:
#. Not is github code
data.take(3)

In [None]:
data.describe().show()

In [None]:
data.columns

In [None]:
data.count()

In [None]:
data.distinct().count() 

In [None]:
data.printSchema()

## Column Operations/Manipulations

In [None]:
# Adding Column: Use withColumn the method takes two parameters 
# column name and data to add a new column to the existing data. 
data = data.withColumn('date', data.data)

data.show(5)

In [None]:
# Update column: Use withColumnRenamed which takes to parameters existing 
# column name and new column name to rename the existing column.

data = data.withColumnRenamed('date', 'data_changed')

data.show(5)

In [None]:
data = data.drop('data_changed')

data.show(5)

In [None]:
data.select(['open', 'high', 'low', 'close', 'volume', 'adjusted']).describe().show()

In [None]:
data.groupBy('sector').count().show()

In [32]:
sec_x =  data.select(['sector', 'open', 'close', 'adjusted']).groupBy('sector').mean().collect()

Convert the data into **list**

In [None]:
for row in sec_x:
  print(list(row), end='\n')

Convert the data into **dictionary**

In [None]:
for row in sec_x:
  print(row.asDict(), end='\n')

convert data into pandas **datafame**

In [35]:
sec_df =  data.select(['sector', 'open', 'close', 'adjusted']).groupBy('sector').mean().toPandas()

In [None]:
sec_df

In [None]:
sec_df.plot(kind = 'bar', x='sector', y = sec_df.columns.tolist()[1:], figsize=(12, 6))

Remove **basic industries** from the plot and view it again...

In [None]:
ind = list(range(12))
ind.pop(6)
sec_df.iloc[ind ,:].plot(kind = 'bar', x='sector', y = sec_df.columns.tolist()[1:], figsize=(12, 6), ylabel = 'Stock Price', xlabel = 'Sector')
plt.show()

In [None]:
industries_x = data.select(['industry', 'open', 'close', 'adjusted']).groupBy('industry').mean().toPandas()

industries_x.head()

In [None]:
industries_x.plot(kind = 'barh', x='industry', y = industries_x.columns.tolist()[1:], figsize=(10, 50))

Remove **major chemicals** and **building products** to view the rest data clearly

In [None]:
q  = industries_x[(industries_x.industry != 'Major Chemicals') & (industries_x.industry != 'Building Products')]

q.plot(kind = 'barh', x='industry', y = q.columns.tolist()[1:], figsize=(10, 50), xlabel='Stock Price', ylabel = 'Industry')

plt.show()

In [None]:
import pyspark.sql.functions as f

health = data.filter(f.col('sector') == 'Health Care')

health.show()

### How to use Aggregation

In [None]:
from pyspark.sql.functions import col, min, max, avg, lit

data.groupBy("sector") \
    .agg(min("data").alias("From"), 
         max("data").alias("To"), 
         
         min("open").alias("Minimum Opening"),
         max("open").alias("Maximum Opening"), 
         avg("open").alias("Average Opening"), 

         min("close").alias("Minimum Closing"), 
         max("close").alias("Maximum Closing"), 
         avg("close").alias("Average Closing"), 

         min("adjusted").alias("Minimum Adjusted Closing"), 
         max("adjusted").alias("Maximum Adjusted Closing"), 
         avg("adjusted").alias("Average Adjusted Closing"), 

      ).show(truncate=False)

Get the min, max, avg data w.r.t sectors from **Jan 2019** to **Jan 2020**

In [None]:
data.filter( (col('data') >= lit('2019-01-02')) & (col('data') <= lit('2020-01-31')) )\
    .groupBy("sector") \
    .agg(min("data").alias("From"), 
         max("data").alias("To"), 
         
         min("open").alias("Minimum Opening"),
         max("open").alias("Maximum Opening"), 
         avg("open").alias("Average Opening"), 

         min("close").alias("Minimum Closing"), 
         max("close").alias("Maximum Closing"), 
         avg("close").alias("Average Closing"), 

         min("adjusted").alias("Minimum Adjusted Closing"), 
         max("adjusted").alias("Maximum Adjusted Closing"), 
         avg("adjusted").alias("Average Adjusted Closing"), 

      ).show(truncate=False)

Plot the timeseries data od **technology** sector stock trade

In [None]:
tech = data.where(col('sector') == 'Technology').select('data', 'open', 'close', 'adjusted')

tech.show()

In [None]:
fig, axes = plt.subplots(nrows=3, ncols=1, figsize =(60, 30))

tech.toPandas().plot(kind = 'line', x = 'data', y='open', xlabel = 'Date Range', ylabel = 'Stock Opening Price', ax = axes[0], color = 'mediumspringgreen')

tech.toPandas().plot(kind = 'line', x = 'data', y='close', xlabel = 'Date Range', ylabel = 'Stock Closing Price', ax = axes[1], color = 'tomato')

tech.toPandas().plot(kind = 'line', x = 'data', y='adjusted', xlabel = 'Date Range', ylabel = 'Stock Adjusted Price', ax = axes[2], color = 'orange')

plt.show()

In [None]:
data.select('sector').show(5)

In [None]:
data.select(['open', 'close', 'adjusted']).show(5)

In [None]:
data.filter(data.adjusted.between(100.0, 500.0)).show(5)

In [None]:
from pyspark.sql.functions import col, lit

data.filter( (col('data') >= lit('2020-01-01')) & (col('data') <= lit('2020-01-31')) ).show(5)

In [None]:
data.select('open', 'close', f.when(data.adjusted >= 200.0, 1).otherwise(0)).show(5)

In [None]:
data.select('sector', 
            data.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

In [None]:
data.select(['industry', 'open', 'close', 'adjusted']).groupBy('industry').mean().show()