## Create Spark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Jupyter with Spark Cluster") \
    .master("spark://spark-master:7077") \
    .getOrCreate()

In [4]:
spark

## Read csv file with spark session

In [2]:
df = spark.read.csv("/home/jovyan/data/email.csv", header=True, inferSchema=True, sep=";")

In [3]:
df.show()

+-----------------+----------+----------+---------+
|      Login email|Identifier|First name|Last name|
+-----------------+----------+----------+---------+
|laura@example.com|      2070|     Laura|     Grey|
|craig@example.com|      4081|     Craig|  Johnson|
| mary@example.com|      9346|      Mary|  Jenkins|
|jamie@example.com|      5079|     Jamie|    Smith|
+-----------------+----------+----------+---------+



In [9]:
df.printSchema()

root
 |-- Login email: string (nullable = true)
 |-- Identifier: integer (nullable = true)
 |-- First name: string (nullable = true)
 |-- Last name: string (nullable = true)



# Read Parquet files

In [5]:
bank_failures_df = spark.read.parquet("/home/jovyan/data/bank_failures.parquet")

In [10]:
bank_failures_df.show()

+---+--------------------+-------------+-------------+----------+--------------------+--------------+
| c1|                Bank|         City|        State|      Date|         Acquired by|Assets ($mil.)|
+---+--------------------+-------------+-------------+----------+--------------------+--------------+
|  1|Douglass National...|  Kansas City|     Missouri|2008-01-25|Liberty Bank and ...|          58.5|
|  2|           Hume Bank|         Hume|     Missouri|2008-03-07|       Security Bank|          18.7|
|  3|        Bear Stearns|New York City|     New York|2008-03-16|   J.P. Morgan Chase|      395000.0|
|  4|  ANB Financial N.A.|  Bentonville|     Arkansas|2008-05-09|Pulaski Bank and ...|        2100.0|
|  5|First Integrity B...|      Staples|    Minnesota|2008-05-30|First Internation...|          54.7|
|  6|             IndyMac|     Pasadena|   California|2008-07-11|   OneWest Bank, FSB|       32000.0|
|  7|First National Ba...|         Reno|       Nevada|2008-07-25|Mutual of Omaha B

In [7]:
bank_failures_df.printSchema()

root
 |-- c1: long (nullable = true)
 |-- Bank: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Acquired by: string (nullable = true)
 |-- Assets ($mil.): double (nullable = true)



In [11]:
spark.sql("""
            SELECT 
                *,
                State AS St
            FROM {df}
            LIMIT 10
          """, df=bank_failures_df).show()

+---+--------------------+-------------+----------+----------+--------------------+--------------+----------+
| c1|                Bank|         City|     State|      Date|         Acquired by|Assets ($mil.)|        St|
+---+--------------------+-------------+----------+----------+--------------------+--------------+----------+
|  1|Douglass National...|  Kansas City|  Missouri|2008-01-25|Liberty Bank and ...|          58.5|  Missouri|
|  2|           Hume Bank|         Hume|  Missouri|2008-03-07|       Security Bank|          18.7|  Missouri|
|  3|        Bear Stearns|New York City|  New York|2008-03-16|   J.P. Morgan Chase|      395000.0|  New York|
|  4|  ANB Financial N.A.|  Bentonville|  Arkansas|2008-05-09|Pulaski Bank and ...|        2100.0|  Arkansas|
|  5|First Integrity B...|      Staples| Minnesota|2008-05-30|First Internation...|          54.7| Minnesota|
|  6|             IndyMac|     Pasadena|California|2008-07-11|   OneWest Bank, FSB|       32000.0|California|
|  7|First

In [12]:
new_bank_failures_df = bank_failures_df.withColumnRenamed("Assets ($mil.)","Assets")
new_bank_failures_df.show()

+---+--------------------+-------------+-------------+----------+--------------------+--------+
| c1|                Bank|         City|        State|      Date|         Acquired by|  Assets|
+---+--------------------+-------------+-------------+----------+--------------------+--------+
|  1|Douglass National...|  Kansas City|     Missouri|2008-01-25|Liberty Bank and ...|    58.5|
|  2|           Hume Bank|         Hume|     Missouri|2008-03-07|       Security Bank|    18.7|
|  3|        Bear Stearns|New York City|     New York|2008-03-16|   J.P. Morgan Chase|395000.0|
|  4|  ANB Financial N.A.|  Bentonville|     Arkansas|2008-05-09|Pulaski Bank and ...|  2100.0|
|  5|First Integrity B...|      Staples|    Minnesota|2008-05-30|First Internation...|    54.7|
|  6|             IndyMac|     Pasadena|   California|2008-07-11|   OneWest Bank, FSB| 32000.0|
|  7|First National Ba...|         Reno|       Nevada|2008-07-25|Mutual of Omaha Bank|  3400.0|
|  8|First Heritage Ba...|Newport Beach|

In [13]:
import pyspark.sql.functions as F
new_bank_failures_df = (new_bank_failures_df
                        .withColumn("Year", F.year(F.col("Date")))
                        .withColumn("Month", F.month(F.col("Date")))
                        .withColumn("Day", F.month(F.col("Date"))))
new_bank_failures_df.show()

+---+--------------------+-------------+-------------+----------+--------------------+--------+----+-----+---+
| c1|                Bank|         City|        State|      Date|         Acquired by|  Assets|Year|Month|Day|
+---+--------------------+-------------+-------------+----------+--------------------+--------+----+-----+---+
|  1|Douglass National...|  Kansas City|     Missouri|2008-01-25|Liberty Bank and ...|    58.5|2008|    1|  1|
|  2|           Hume Bank|         Hume|     Missouri|2008-03-07|       Security Bank|    18.7|2008|    3|  3|
|  3|        Bear Stearns|New York City|     New York|2008-03-16|   J.P. Morgan Chase|395000.0|2008|    3|  3|
|  4|  ANB Financial N.A.|  Bentonville|     Arkansas|2008-05-09|Pulaski Bank and ...|  2100.0|2008|    5|  5|
|  5|First Integrity B...|      Staples|    Minnesota|2008-05-30|First Internation...|    54.7|2008|    5|  5|
|  6|             IndyMac|     Pasadena|   California|2008-07-11|   OneWest Bank, FSB| 32000.0|2008|    7|  7|
|

In [None]:
#jdsjhdsh