# Beginners Guide to PySpark

In [2]:
# Let's start Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local[*]")\
        .appName('PySpark_Tutorial')\
        .getOrCreate()

## Reading Data

### Download DATA

You can find the stock price data [here](https://drive.google.com/file/d/19z6AKWpKOQLpOiiLZ_QoprsPtIcOipNa/view?usp=sharing)

## Import Modules

In [None]:
from pyspark.sql import functions as f

import pandas as pd

import seaborn as sns

import matplotlib.pyplot as plt

%matplotlib inline

## Read Data

In [None]:
# Before changing schema
b_data = spark.read.csv(
    'stocks_price_final.csv',
    sep = ',',
    header = True,
    )

b_data.printSchema()

We can see that all data types are **strings**. Spark tries to infer the schema from data however some times the inferred datatype may not be correct or we may need to define our own column names and data types. We should assign the datatypes manually, similarly to what we do when creating table in SQL. Therefore it's important that we are familiar with the dataset before we load it to Spark. If it is huge it's always a good idea to explore the smaller sample locally in Excel or Pandas.

In [None]:
from pyspark.sql.types import *

data_schema = [
               StructField('_c0', IntegerType(), True),
               StructField('symbol', StringType(), True),
               StructField('data', DateType(), True),
               StructField('open', DoubleType(), True),
               StructField('high', DoubleType(), True),
               StructField('low', DoubleType(), True),
               StructField('close', DoubleType(), True),
               StructField('volume', IntegerType(), True),
               StructField('adjusted', DoubleType(), True),
               StructField('market.cap', StringType(), True),
               StructField('sector', StringType(), True),
               StructField('industry', StringType(), True),
               StructField('exchange', StringType(), True),
            ]

final_struc = StructType(fields=data_schema)

In [None]:
data = spark.read.csv(
    'stocks_price_final.csv',
    sep = ',',
    header = True,
    schema = final_struc
    )

In [None]:
data.printSchema()

Now, we can check the first 5 rows of the data.

In [None]:
data.show(5)

In [None]:
data = data.withColumnRenamed('market.cap', 'market_cap')

## Inspect the data

**schema():** This method returns the schema of the data(dataframe). The below example w.r.t US StockPrice data is shown.

In [None]:
data.schema

**dtypes:** It returns a list of tuples with column names and it’s data types.

In [None]:
data.dtypes

**head(n)**: It returns n rows as a list

In [None]:
data.head(3)

**show(n)**: this works in the exactly same way as **head()** in Pandas

In [None]:
data.show(5)

**describe():** It computes the summary statistics of the columns with the numeric data type. It creates a dataframe so we need to call .show() to see the results in the output window.

In [3]:
# PySpark is lazily evaluated so it doesn't execute until we call function .show()
data.describe().show()

NameError: name 'data' is not defined

**columns:** It works in the exactly same way as Pandas

In [None]:
data.columns

count(): It returns the count of the number of rows in the data.

In [None]:
data.count()

If we add distinct() before count, it will give us only distinct rows. This is the ideal way to check for potential duplicates

In [None]:
data.distinct().count() 

## Column Operations/Manipulations

Now, we will take a look at how we can manipulate columns using PySpark, specifically at:

- adding columns
- updating columns
- deleting columns

In [None]:
# adding column date
data = data.withColumn('date', data.data)

data.show(5)

In [None]:
# renaming column date to data_changed
data = data.withColumnRenamed('date', 'data_changed')

data.show(5)

In [None]:
# dropping column data_changed
data = data.drop('data_changed')

data.show(5)

## Dealing with Missing Values

Missing values are often part of the datasets we have to work with. As we already know there are bunch of ways we can use to handle the missing data, for example:

- removing
- imputing with Mean or Median
- imputing with Most Frequent Value

In Spark we can, of course, use all of this techniques.

In [None]:
# Removing Rows with Missing Values
data.na.drop()

In [None]:
# Replacing Missing Values with Mean
data.na.fill(data.select(f.mean(data['open'])).collect()[0][0])

## Querying Data

The PySpark and PySpark SQL provide a wide range of methods and functions to query the data at ease. The idea is to reproduce SQL logic as close as possible. Here are the few most used methods:

- Select
- Filter
- Between
- When
- Like
- GroupBy
- Aggregations

### Select
It is used to select single or multiple columns using the names of the columns. Here is a simple example:

In [None]:
data.select('sector').show(5)

In [None]:
data.select(['open', 'close', 'adjusted']).show(5)

### Filter
Filter the data based on the given condition, you can also give multiple conditions using AND(**&**), OR(**|**), and NOT(**~**) operators.

In [None]:
data.filter(data.adjusted.between(100.0, 500.0)).show(5)

In [None]:
from pyspark.sql.functions import col, lit

data.filter( (col('data') >= lit('2020-01-01')) & (col('data') <= lit('2020-01-31')) ).show(5)

### When
It returns 0 or 1 depending on the given condition, the below example shows how to select the opening and closing price of stocks when the adjusted price is greater than equals to 200.

In [None]:
data.select('open', 'close', f.when(data.adjusted >= 200.0, 1).otherwise(0)).show(5)

### Like
It is similar to the like operator in SQL, The below example show to extract the sector names which stars with either M or C using `rlike`.

In [None]:
data.select('sector', 
            data.sector.rlike('^[B,C]').alias('Sector Starting with B or C')
            ).distinct().show()

### GroupBy
It groups the data by the given column name and it can perform different operations such as sum, mean, min, max, e.t.c. The below example explains how to get the average opening, closing, and adjusted stock price concerning industries.

In [None]:
data.select(['industry', 'open', 'close', 'adjusted']).groupBy('industry').mean().show()

### Aggregation
This is similiar to GroupBy statement in SQL where we can call different aggregation functions like count, sum, min, max on different variables. The below example shows how to display the minimum, maximum, and average; opening, closing, and adjusted stock prices from January 2019 to January 2020 concerning the sectors.

In [None]:
data.show(2)

In [None]:
# import all pyspark sql functions as fs
from pyspark.sql import functions as fs

In [None]:
data.filter( (col('data') >= lit('2019-01-02')) & (col('data') <= lit('2020-01-31')) )\
    .groupBy("sector") \
    .agg(fs.min("data").alias("From"), 
         fs.max("data").alias("To"), 
         
         fs.min("open").alias("Minimum Opening"),
         fs.max("open").alias("Maximum Opening"), 
         fs.avg("open").alias("Average Opening"), 

         fs.min("close").alias("Minimum Closing"), 
         fs.max("close").alias("Maximum Closing"), 
         fs.avg("close").alias("Average Closing"), 

         fs.min("adjusted").alias("Minimum Adjusted Closing"), 
         fs.max("adjusted").alias("Maximum Adjusted Closing"), 
         fs.avg("adjusted").alias("Average Adjusted Closing"), 

      ).show(truncate=False)

## Data Visualization

We will combine PySpark with Matplotlib and Pandas to show simple visualizations.  `toPandas()` method is ised to convert the data into pandas dataframe. Using the dataframe we utilize the `plot()` method to visualize data. When we are working with huge data, it's important to do some aggregation before transforming data into Pandas dataframe. 

The below code shows how to display a bar graph for the average opening, closing, and adjusted stock price concerning the sector.

In [None]:
sec_df =  data.select(['sector', 'open', 'close', 'adjusted']).groupBy('sector').mean().toPandas()

In [None]:
ind = list(range(12))

ind.pop(6)

sec_df.iloc[ind ,:].plot(kind = 'bar', x='sector', y = sec_df.columns.tolist()[1:], 
                         figsize=(12, 6), ylabel = 'Stock Price', xlabel = 'Sector')
plt.show()

Similarly, let’s visualize the average opening, closing, and adjusted price concerning industries.

In [None]:
industries_x = data.select(['industry', 'open', 'close', 'adjusted']).groupBy('industry').mean().toPandas()
q  = industries_x[(industries_x.industry != 'Major Chemicals') & (industries_x.industry != 'Building Products')]
q.plot(kind = 'barh', x='industry', y = q.columns.tolist()[1:], figsize=(10, 50), xlabel='Stock Price', ylabel = 'Industry')
plt.show()

## Write/Save Data to File

The `write.save()` method is used to save the data in different formats such as CSV, JSON, Parquet, e.t.c. Let’s see how to save the data in different file formats. We can able to save entire data and selected data using the `select()` method.

In [None]:
## Writing entire data to different file formats

# CSV
data.write.csv('dataset.csv')

# JSON
data.write.save('dataset.json', format='json')

# Parquet
data.write.save('dataset.parquet', format='parquet')

## Writing selected data to different file formats

# CSV
data.select(['data', 'open', 'close', 'adjusted'])\
            .write.csv('dataset.csv')

# JSON
data.select(['data', 'open', 'close', 'adjusted'])\
    .write.save('dataset.json', format='json')

# Parquet
data.select(['data', 'open', 'close', 'adjusted'])\
    .write.save('dataset.parquet', format='parquet')