In [None]:
#importing Libraries
import pyspark.sql.types as T
import pyspark.sql.functions as F

In [None]:
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
from pyspark.sql.functions import count

In [None]:
# importing sparks session
from pyspark.sql import SparkSession

In [None]:
# creating spark session
spark = SparkSession.builder.appName("MySparkApp").getOrCreate()

In [None]:
# creating a dataframe
test = spark.createDataFrame(
    data = [("Prachiti", "Data Analyst"), ("Jayant Kumar", "data Analyst")],
    schema = ["name", "role"]
)

In [None]:
display(test)

DataFrame[name: string, role: string]

In [None]:
# reading csv and loading it into a dataframe
df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/content/sample_data/california_housing_test.csv")
)
display(df_csv )

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

# Column Operations

In [None]:
# selecting columns from a dataframe
df_csv.select(
    col('longitude'),
    col('latitude'),
    col('housing_median_age'),
    col('total_rooms'),
    col('total_bedrooms'),
    col('population'),
    col('households'),
    col('median_income'),
    col('median_house_value')
)

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [None]:
# displating 3 rows from a dataframe
df_csv.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [None]:
# creating a column in dataframe
df_transformed = df_csv.withColumn("top_house_value", col("median_house_value") > 176500)
df_transformed.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|top_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|           true|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|          false|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|           true|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [None]:
# applying filters in a dataframe using AND operator
df_transformed = df_transformed.filter((col("top_house_value") == True) & (col("population") >= 1000))
df_transformed.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|top_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|           true|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|           true|
|  -122.84|    38.4|              15.0|     3080.0|         617.0|    1446.0|     599.0|       3.6696|          194400.0|           true|
|  -117.03|   32.97|              16.0|     3936.0|         694.0|    1935.0|     659.0|       4.5625|          231200.0|           true|
|  -117.97|   33.73|              

In [None]:
# applying filters using OR operator
df_transformed = df_transformed.filter((col("housing_median_age") > 16) | (col("total_bedrooms") > 617))
df_transformed.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|top_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|           true|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|           true|
|  -117.03|   32.97|              16.0|     3936.0|         694.0|    1935.0|     659.0|       4.5625|          231200.0|           true|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [None]:
# filtering distinct rows
df_transformed = df_transformed.distinct()
df_transformed.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|top_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.09|    37.7|              33.0|     4413.0|        1107.0|    2239.0|    1051.0|       2.9861|          208200.0|           true|
|  -122.45|   37.74|              38.0|     5688.0|         930.0|    2263.0|     908.0|        6.203|          346800.0|           true|
|  -118.32|   34.08|              52.0|     2370.0|         473.0|    1053.0|     434.0|       4.1429|          380300.0|           true|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
only showing top 3 rows



In [None]:
df_transformed = df_transformed.na.drop()

In [None]:
df_transformed = df_transformed.na.drop('any')

In [None]:
df_transformed = df_transformed.na.drop("all", subset=["population", "households"])

In [None]:
df_transformed.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|top_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.09|    37.7|              33.0|     4413.0|        1107.0|    2239.0|    1051.0|       2.9861|          208200.0|           true|
|  -122.45|   37.74|              38.0|     5688.0|         930.0|    2263.0|     908.0|        6.203|          346800.0|           true|
|  -118.32|   34.08|              52.0|     2370.0|         473.0|    1053.0|     434.0|       4.1429|          380300.0|           true|
|  -118.35|   33.99|              48.0|     2741.0|         439.0|    1115.0|     459.0|       5.0514|          269100.0|           true|
|  -119.19|   34.22|              



# Row Operations

In [None]:
#first dataframe
df_1 = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/content/sample_data/california_housing_test.csv")
)
display(df_1)

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [None]:
#second dataframe
df_2 = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/content/sample_data/california_housing_train.csv")
)
display(df_2)

DataFrame[longitude: double, latitude: double, housing_median_age: double, total_rooms: double, total_bedrooms: double, population: double, households: double, median_income: double, median_house_value: double]

In [None]:
# union
df = df_1.union(df_2)

In [None]:
# union all
df_union_all = df_1.unionAll(df_2)

In [None]:
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
|  -117.81|   33.78|              27.0|     3589.0|         507.0|    1484.0|     495.0|       5.7934|          270500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [None]:
df = df.orderBy(col("median_income"))
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -118.28|   34.02|              29.0|      515.0|         229.0|    2690.0|     217.0|       0.4999|          500001.0|
|  -117.75|   34.06|              52.0|       62.0|           9.0|      44.0|      16.0|       0.4999|          112500.0|
|  -118.28|   33.93|              52.0|      117.0|          33.0|      74.0|      45.0|       0.4999|           90600.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [None]:
df = df.sort(col("households").desc())
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -121.79|   36.64|              11.0|    32627.0|        6445.0|   28566.0|    6082.0|       2.3087|          118800.0|
|  -117.74|   33.89|               4.0|    37937.0|        5471.0|   16122.0|    5189.0|       7.4947|          366300.0|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



In [None]:
df = df.sort(col("households").desc(), col("total_bedrooms").asc())
df.show(3)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -121.79|   36.64|              11.0|    32627.0|        6445.0|   28566.0|    6082.0|       2.3087|          118800.0|
|  -117.74|   33.89|               4.0|    37937.0|        5471.0|   16122.0|    5189.0|       7.4947|          366300.0|
|  -117.78|   34.03|               8.0|    32054.0|        5290.0|   15507.0|    5050.0|       6.0191|          253900.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows



# Joining Dataframes

In [None]:
df_1.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
|   -118.3|   34.26|              43.0|     1510.0|         310.0|     809.0|     277.0|        3.599|          176500.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 2 rows



In [None]:
df_2.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 2 rows



In [None]:
df_joined = df_1.join(
    df_2,
    on = (df_1['housing_median_age'] == df_2['housing_median_age']) & (df_1['housing_median_age'] > 25) ,
    how = "inner"
)

In [None]:
df_joined.show(2)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -121.92|   36.95|              29.0|     3457.0|         699.0|    1327.0|     563.0|       3.6597|          252300.0|  -114.58|   33.63|              29.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0|
|  -122.11|   37.14|            

# Aggregattion

In [None]:
df_1.show(1)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -122.05|   37.37|              27.0|     3885.0|         661.0|    1537.0|     606.0|       6.6085|          344700.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 1 row



In [None]:
df_agg = df_1.groupBy("housing_median_age").agg(avg(df_1['total_rooms']))

In [None]:

df_agg.show(5)


+------------------+------------------+
|housing_median_age|  avg(total_rooms)|
+------------------+------------------+
|               8.0|           3311.52|
|               7.0|           3718.25|
|              49.0|1492.8095238095239|
|              29.0| 2524.306666666667|
|              47.0|            1786.5|
+------------------+------------------+
only showing top 5 rows



In [None]:
df_agg = df_agg.withColumnRenamed("housing_median_age" , "avg_total_rooms")

In [None]:
df_agg.show(5)

+---------------+------------------+
|avg_total_rooms|  avg(total_rooms)|
+---------------+------------------+
|            8.0|           3311.52|
|            7.0|           3718.25|
|           49.0|1492.8095238095239|
|           29.0| 2524.306666666667|
|           47.0|            1786.5|
+---------------+------------------+
only showing top 5 rows



# Chained code

In [None]:
df_chained = (df_1.filter(col("housing_median_age") > 20)
             .groupBy(col("housing_median_age"))
             .agg(avg(col("total_rooms")).alias("total_rooms"))
             .sort(col("housing_median_age").desc())
              )

In [None]:
df_chained.show(10)

+------------------+------------------+
|housing_median_age|       total_rooms|
+------------------+------------------+
|              52.0|1831.2196531791908|
|              51.0| 1641.909090909091|
|              50.0|         1779.3125|
|              49.0|1492.8095238095239|
|              48.0|1648.8823529411766|
|              47.0|            1786.5|
|              46.0|2031.6097560975609|
|              45.0|1677.4901960784314|
|              44.0|1862.4901960784314|
|              43.0|1842.3392857142858|
+------------------+------------------+
only showing top 10 rows

