<a href="https://colab.research.google.com/github/rafaeldeflon/Exploratory_Data_Analysis/blob/main/PySpark_Intro_Study.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Dataframes
For each instruction, type the appropriate code into the cell below the instruction. Then, run the code by pressing the Run button above.

1. Import the SparkSession class:

from pyspark.sql import SparkSession

In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m2.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425345 sha256=b3a159f33ebcf9133c021347fcbc0e574d2bab0134edb1f2eb7757d320f3cbdd
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [3]:
from pyspark.sql import SparkSession

2. Use this class to instiate a Spark session:
    
    spark = SparkSession \
    .builder \
    .appName("My First PySpark App") \
    .getOrCreate()

In [6]:
spark = SparkSession \
  .builder \
  .appName("My First PySpark App") \
  .getOrCreate()


3. Take a look at the session object:

spark

In [7]:
spark

4.Read the contents of a csv file into a Dataframe named 'accounts':

accounts = spark.read.option('header', 'true').csv('./data/accounts.csv')

In [11]:
!ls sample_data/

anscombe.json		     california_housing_train.csv  mnist_train_small.csv
california_housing_test.csv  mnist_test.csv		   README.md


In [12]:
california_housing = spark.read.option('header', 'true').csv('./sample_data/california_housing_test.csv')

5. Take a look at the Dataframe's schema:

accounts.printSchema()

In [13]:
california_housing.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



6. Read the contents of a parquet file into a variable:

transactions = spark.read.option('header', True).parquet('./data/transactions.parquet')

In [14]:
california_housing_train = spark.read.option('header', True).parquet('./sample_data/california_housing_train.parquet')

7. See how many rows are in the new Dataframe:

transactions.count()

In [15]:
california_housing_train.count()

17000

In [16]:
# Sample of the Dataframe california_housing_train
california_housing_train.sample(0.1).show()


+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0|
|   -114.6|   34.83|              46.0|     1497.0|         309.0|     787.0|     271.0|       2.1908|           48100.0|
|  -115.48|   32.68|              15.0|     3414.0|         666.0|    2097.0|     622.0|       2.3319|           91200.0|
|  -115.49|   32.87|              19.0|      541.0|         104.0|     457.0|     106.0|       3.3583|          102800.0|
|   -115.5|   32.68|              18.0|     3631.0|         913.0|    3565.0|     924.0|       1.5931|           88400.0|
|  -115.53|   34.91|    

In [17]:
# Creating a new column classifying the rows on the Dataframe california_housing_train by the int part of the column 'latitude'

from pyspark.sql.functions import floor

california_housing_train = california_housing_train.withColumn('latitude_int', floor(california_housing_train['latitude']))


In [18]:
california_housing_train.sample(0.1).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|latitude_int|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+------------+
|  -114.65|   34.89|              17.0|     2556.0|         587.0|    1005.0|     401.0|       1.6991|           69100.0|          34|
|  -115.51|   33.12|              21.0|     1024.0|         218.0|     890.0|     232.0|        2.101|           46700.0|          33|
|  -115.52|   32.98|              32.0|     1615.0|         382.0|    1307.0|     345.0|       1.4583|           58600.0|          32|
|  -115.55|    32.8|              23.0|      666.0|         142.0|     580.0|     160.0|       2.1136|           61000.0|          32|
|  -115.55|   32.79|              23.0|     1004.0|    

8. Make a new Dataframe by grouping the transactions by account number and summing the groups. This will combine the transactions per account:

account_transactions = transactions.groupby('account_number').sum()

In [19]:
housing_latitude = california_housing_train.groupby('latitude_int').sum('median_income')

9. Combine the accounts with the summed transaction values:

with_sum = accounts.join(account_transactions, 'account_number', 'inner')

10. Get the current balance per account by summing the transaction sums with the initial account balance:

accounts = with_sum.withColumn('new_balance', sum([with_sum.balance, with_sum['sum(amount)']]))

11. Get accounts with negative current balances:

neg_balance = accounts.filter(accounts.new_balance < 0)

12. Read client data from a json file:

clients = spark.read.json('./data/clients.json')

13. Get the clients with a negative balance:

clients = clients.join(neg_balance, 'account_number', 'inner')

14. Look at the top five clients with negative balances:

clients.select(['first_name', 'last_name', 'account_number', 'new_balance']).show(5)