#### Exercise

1. Create a spark session
2. Load CSV data
3. Check schema and show dataframe
4. Check datatypes and column names
5. Drop column(s)
6. Rename column(s)
7. Summary statistics (describe)
8. Check missing values
9. Use SQL queries
10. Add column(s)
11. Aggregate


#### Spark session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg

In [2]:
spark = SparkSession.builder.appName('DataFrames').getOrCreate()

spark

#### CSV load


In [3]:
spark.read.csv('housing.csv').printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)
 |-- _c4: string (nullable = true)
 |-- _c5: string (nullable = true)
 |-- _c6: string (nullable = true)



In [4]:
spark.read.csv('housing.csv', header=True).printSchema()

root
 |-- Avg. Area Income: string (nullable = true)
 |-- Avg. Area House Age: string (nullable = true)
 |-- Avg. Area Number of Rooms: string (nullable = true)
 |-- Avg. Area Number of Bedrooms: string (nullable = true)
 |-- Area Population: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Address: string (nullable = true)



In [5]:
spark.read.csv('housing.csv', header=True, inferSchema=True).printSchema()

root
 |-- Avg. Area Income: string (nullable = true)
 |-- Avg. Area House Age: string (nullable = true)
 |-- Avg. Area Number of Rooms: double (nullable = true)
 |-- Avg. Area Number of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Address: string (nullable = true)



In [None]:
spark.read.csv('housing.csv', header=True, inferSchema=True).show(2)

In [6]:
spark.read.csv('housing.csv', header=True, inferSchema=True, multiLine=True).printSchema()

root
 |-- Avg. Area Income: double (nullable = true)
 |-- Avg. Area House Age: double (nullable = true)
 |-- Avg. Area Number of Rooms: double (nullable = true)
 |-- Avg. Area Number of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Address: string (nullable = true)



In [7]:
df = spark.read.csv('housing.csv', header=True, inferSchema=True, multiLine=True)


#### Check schema and show dataframe

In [8]:
df.printSchema()

root
 |-- Avg. Area Income: double (nullable = true)
 |-- Avg. Area House Age: double (nullable = true)
 |-- Avg. Area Number of Rooms: double (nullable = true)
 |-- Avg. Area Number of Bedrooms: double (nullable = true)
 |-- Area Population: double (nullable = true)
 |-- Price: double (nullable = true)
 |-- Address: string (nullable = true)



In [9]:
df.show(3)

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|             Address|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|208 Michael Ferry...|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|188 Johnson Views...|
|     61287.06718|         5.86588984|               8.51272743|                        5.13|     36882.1594|1058987.988|9127 Elizabeth St...|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+--------------------+

#### Check data types and column names

In [10]:
df.columns

['Avg. Area Income',
 'Avg. Area House Age',
 'Avg. Area Number of Rooms',
 'Avg. Area Number of Bedrooms',
 'Area Population',
 'Price',
 'Address']

In [11]:
df.dtypes

[('Avg. Area Income', 'double'),
 ('Avg. Area House Age', 'double'),
 ('Avg. Area Number of Rooms', 'double'),
 ('Avg. Area Number of Bedrooms', 'double'),
 ('Area Population', 'double'),
 ('Price', 'double'),
 ('Address', 'string')]

#### Drop column(s)

In [12]:
df.drop('Address').show()

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|
|     61287.06718|         5.86588984|               8.51272743|                        5.13|     36882.1594|1058987.988|
|     63345.24005|        7.188236095|              5.586728665|                        3.26|    34310.24283|1260616.807|
|     59982.19723|        5.040554523|              7.839387785|                        4.23|    26354.10947|630943.4893|
|     80175.75416|      

In [13]:
df = df.drop('Address')

In [14]:
df.show(2)

+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|Avg. Area Income|Avg. Area House Age|Avg. Area Number of Rooms|Avg. Area Number of Bedrooms|Area Population|      Price|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
|     79545.45857|        5.682861322|              7.009188143|                        4.09|     23086.8005|1059033.558|
|     79248.64245|        6.002899808|              6.730821019|                        3.09|    40173.07217|1505890.915|
+----------------+-------------------+-------------------------+----------------------------+---------------+-----------+
only showing top 2 rows



#### Rename column(s)

In [15]:
column_names = df.columns

In [33]:
'_'.join(column_names[1].split('Area')[-1].strip().lower().split(' '))

'house_age'

In [34]:
col_rename = lambda x: '_'.join([val.lower() for val in x.split('Area')[-1].strip().split(' ')])
column_map = {c : col_rename(c) for c in column_names}
column_map

{'Avg. Area Income': 'income',
 'Avg. Area House Age': 'house_age',
 'Avg. Area Number of Rooms': 'number_of_rooms',
 'Avg. Area Number of Bedrooms': 'number_of_bedrooms',
 'Area Population': 'population',
 'Price': 'price'}

In [35]:
for k, v in column_map.items():
    df = df.withColumnRenamed(k, v)

In [36]:
df.columns

['income',
 'house_age',
 'number_of_rooms',
 'number_of_bedrooms',
 'population',
 'price']

#### Summary statistics or info

In [37]:
df.describe(df.columns).show()

+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|summary|            income|         house_age|   number_of_rooms|number_of_bedrooms|       population|             price|
+-------+------------------+------------------+------------------+------------------+-----------------+------------------+
|  count|              5000|              5000|              5000|              5000|             5000|              5000|
|   mean| 68583.10898397019| 5.977222035287008| 6.987791850909204|3.9813299999999967|36163.51603854035|1232072.6541452995|
| stddev|10657.991213888685|0.9914561798324225|1.0058332312754115|1.2341372654846832|9925.650113546026| 353117.6265836953|
|    min|       17796.63119|       2.644304186|       3.236194023|               2.0|      172.6106863|       15938.65792|
|    max|       107701.7484|       9.519088066|       10.75958834|               6.5|      69621.71338|       2469065.594|
+-------+-------

#### Check missing values

In [39]:
df.filter(col('number_of_rooms').isNotNull()).count()

5000

In [40]:
missing_values = 0
for c in df.columns:
    missing_values += df.filter(col(c).isNull()).count()
missing_values

0

#### Use SQL to check the number of rows in the dataframe



In [41]:
df.createOrReplaceTempView('Housing')
spark.sql("SELECT count(*) from Housing").show()

+--------+
|count(1)|
+--------+
|    5000|
+--------+



#### Add columns

In [42]:
df.withColumn('R2BRRatio', col('number_of_rooms')/col('number_of_bedrooms')).show(2)

+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
|     income|  house_age|number_of_rooms|number_of_bedrooms| population|      price|         R2BRRatio|
+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
|79545.45857|5.682861322|    7.009188143|              4.09| 23086.8005|1059033.558|1.7137379322738389|
|79248.64245|6.002899808|    6.730821019|              3.09|40173.07217|1505890.915|2.1782592294498384|
+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
only showing top 2 rows



#### Filtering and aggregate

In [43]:
# Find number of houses with prices more than 1.5 million (i.e., 1500000)
n = 1500000
print(f"This dataset has {df.filter(df['price'] > n).count()} houses above {n} dollars price.")

This dataset has 1135 houses above 1500000 dollars price.


In [44]:
# What's the average total rooms to bedrooms ratio for houses greater than 2 million dollars price.
n = 2000000
df_expensive = df.filter(df['price'] > n).withColumn('R2BRRatio', col('number_of_rooms')/col('number_of_bedrooms'))

In [45]:
df_expensive.show(5)

+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
|     income|  house_age|number_of_rooms|number_of_bedrooms| population|      price|         R2BRRatio|
+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
|86294.99909| 6.62745694|    8.011897853|              4.07|47560.77534| 2146925.34| 1.968525271007371|
|95450.29309|6.595060685|    6.850361008|              3.33|39388.51552|2014851.344| 2.057165467867868|
| 85845.3178|6.743652961|    9.468766369|              3.46|46477.67868|2152959.409|2.7366376789017344|
|91159.41833|6.536045428|    7.373850919|              3.01| 54861.0911|2298379.487| 2.449784358471761|
|87266.34023|8.248959366|    7.234261404|               5.0|45161.18768|2249122.541|      1.4468522808|
+-----------+-----------+---------------+------------------+-----------+-----------+------------------+
only showing top 5 rows



In [46]:
df_expensive.agg(avg('R2BRRatio')).show()

+------------------+
|    avg(R2BRRatio)|
+------------------+
|1.9771330636775544|
+------------------+



#### Stop session when done

In [47]:
spark.stop()