In [1]:
# import pyspark
import pyspark

**When you see import pyspark in Python code, it means you are loading the PySpark library into your program so you can use Apache Spark’s functionality from Python.**

In [2]:
# from pyspark.sql import SparkSession
from pyspark.sql import SparkSession

you’re telling Python to import the SparkSession class from the pyspark.sql module.

In [4]:
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.getOrCreate()

SparkSession → the main entry point to Spark functionality (DataFrames, SQL, etc.).

.builder → starts the configuration process for creating a new Spark session.

.getOrCreate() → checks if a Spark session already exists:

If yes → returns the existing session.

If no → creates a new one.

In [11]:
# myrange = spark.range(1000).toDF("number")

In [12]:
# myrange.show()

+------+
|number|
+------+
|     0|
|     1|
|     2|
|     3|
|     4|
|     5|
|     6|
|     7|
|     8|
|     9|
|    10|
|    11|
|    12|
|    13|
|    14|
|    15|
|    16|
|    17|
|    18|
|    19|
+------+
only showing top 20 rows


# New Section

In [8]:
flightdata = spark.read.csv("/content/sample_data/california_housing_train.csv",header=True,inferSchema=True)

In [14]:
flightdata.take(5)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0)]

In [None]:
flightdata.take(5)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0)]

In [18]:
# flightdata.select(max('housing_median_age')).take(1)

**it outputs the max age**

In [21]:
staticSchema = flightdata.schema

In [22]:
streamflightdata = spark.readStream.csv("/content/sample_data/california_housing_train.csv",header=True,inferSchema=True)

IllegalArgumentException: Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with 'spark.read.load(directory)' and infer schema from it.

In [23]:
streamflightdata = spark.readStream.schema(staticSchema).csv("/content/sample_data/california_housing_train.csv",header=True)

# New Section

In [24]:
df = spark.read.json("/content/sample_data/anscombe.json")



> difference between take() , collect() , show()



- show prints the first 20
- collect retrieve all data
  - you can use [] to print specific row with indexe
- take returns N rows

In [26]:
df.show()

+------+----+-----+---------------+
|Series|   X|    Y|_corrupt_record|
+------+----+-----+---------------+
|  NULL|NULL| NULL|              [|
|     I|10.0| 8.04|           NULL|
|     I| 8.0| 6.95|           NULL|
|     I|13.0| 7.58|           NULL|
|     I| 9.0| 8.81|           NULL|
|     I|11.0| 8.33|           NULL|
|     I|14.0| 9.96|           NULL|
|     I| 6.0| 7.24|           NULL|
|     I| 4.0| 4.26|           NULL|
|     I|12.0|10.84|           NULL|
|     I| 7.0| 4.81|           NULL|
|     I| 5.0| 5.68|           NULL|
|    II|10.0| 9.14|           NULL|
|    II| 8.0| 8.14|           NULL|
|    II|13.0| 8.74|           NULL|
|    II| 9.0| 8.77|           NULL|
|    II|11.0| 9.26|           NULL|
|    II|14.0|  8.1|           NULL|
|    II| 6.0| 6.13|           NULL|
|    II| 4.0|  3.1|           NULL|
+------+----+-----+---------------+
only showing top 20 rows


In [25]:
df.collect()

[Row(Series=None, X=None, Y=None, _corrupt_record='['),
 Row(Series='I', X=10.0, Y=8.04, _corrupt_record=None),
 Row(Series='I', X=8.0, Y=6.95, _corrupt_record=None),
 Row(Series='I', X=13.0, Y=7.58, _corrupt_record=None),
 Row(Series='I', X=9.0, Y=8.81, _corrupt_record=None),
 Row(Series='I', X=11.0, Y=8.33, _corrupt_record=None),
 Row(Series='I', X=14.0, Y=9.96, _corrupt_record=None),
 Row(Series='I', X=6.0, Y=7.24, _corrupt_record=None),
 Row(Series='I', X=4.0, Y=4.26, _corrupt_record=None),
 Row(Series='I', X=12.0, Y=10.84, _corrupt_record=None),
 Row(Series='I', X=7.0, Y=4.81, _corrupt_record=None),
 Row(Series='I', X=5.0, Y=5.68, _corrupt_record=None),
 Row(Series='II', X=10.0, Y=9.14, _corrupt_record=None),
 Row(Series='II', X=8.0, Y=8.14, _corrupt_record=None),
 Row(Series='II', X=13.0, Y=8.74, _corrupt_record=None),
 Row(Series='II', X=9.0, Y=8.77, _corrupt_record=None),
 Row(Series='II', X=11.0, Y=9.26, _corrupt_record=None),
 Row(Series='II', X=14.0, Y=8.1, _corrupt_record=N

In [29]:
df.take(5)

[Row(Series=None, X=None, Y=None, _corrupt_record='['),
 Row(Series='I', X=10.0, Y=8.04, _corrupt_record=None),
 Row(Series='I', X=8.0, Y=6.95, _corrupt_record=None),
 Row(Series='I', X=13.0, Y=7.58, _corrupt_record=None),
 Row(Series='I', X=9.0, Y=8.81, _corrupt_record=None)]

In [30]:
df.collect()[0]

Row(Series=None, X=None, Y=None, _corrupt_record='[')

In [31]:
df.limit(5).collect() # restrict what you extract from a DataFrame with limit()

[Row(Series=None, X=None, Y=None, _corrupt_record='['),
 Row(Series='I', X=10.0, Y=8.04, _corrupt_record=None),
 Row(Series='I', X=8.0, Y=6.95, _corrupt_record=None),
 Row(Series='I', X=13.0, Y=7.58, _corrupt_record=None),
 Row(Series='I', X=9.0, Y=8.81, _corrupt_record=None)]

In [32]:
df.printSchema()

root
 |-- Series: string (nullable = true)
 |-- X: double (nullable = true)
 |-- Y: double (nullable = true)
 |-- _corrupt_record: string (nullable = true)



In [33]:
from pyspark.sql.types import *
# pyspark.sql.types → This module contains all the data type classes used in Spark SQL (like StringType, IntegerType, StructType, etc.)

In [36]:
testscheme2 = StructType([ StructField("llll",StringType(),True),
                           StructField('dfjf',StringType(),True) ])

from pyspark.sql.types import *

Imports all the data type classes (StructType, StructField, StringType, IntegerType, LongType, etc.) used to define schemas for DataFrames.

In [51]:

python
testSchema1 = StructType([
    StructField('chapters', StringType(), False),
    StructField('pages', IntegerType(), True)
])
Defines a schema with two columns:

chapters → String, not nullable (False).

pages → Integer, nullable (True).

In [52]:

python
testSchema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False)
])
Defines a schema for the flight dataset:

DEST_COUNTRY_NAME → String, nullable.

ORIGIN_COUNTRY_NAME → String, nullable.

count → Long (big integer), not nullable.

In [53]:

python
df = spark.read.schema(testSchema).json("data/flight-data/json/2015-summary.json")
Reads the JSON file 2015-summary.json using the schema you defined.

Ensures the DataFrame has the exact column names and types you specified, instead of inferring them automatically.

In [54]:

python
df.collect()
Collects all rows of the DataFrame into a Python list of Row objects.

Each Row will look like:

python
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)
⚠️ Note: collect() brings all data into the driver’s memory. For large datasets, this can be very heavy. Usually, you’d use .show() or .take(n) to preview instead.

In [37]:
mySchema = StructType([ StructField("some", StringType(), True),
                              StructField("col", StringType(), True),
                              StructField("names", LongType(), False)])
my_Row = Row ('Hello', None, 1)
myDf = spark.createDataFrame([my_Row], mySchema)
myDf.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|NULL|    1|
+-----+----+-----+



# New Section

In [7]:
# df = spark.read.json("/content/sample_data/anscombe.json",header=True,inferSchema=True)

TypeError: DataFrameReader.json() got an unexpected keyword argument 'header'

In [9]:
flightdata.limit(5).collect()

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0),
 Row(longitude=-114.56, latitude=33.69, housing_median_age=17.0, total_rooms=720.0, total_bedrooms=174.0, population=333.0, households=117.0, median_income=1.6509, median_house_value=85700.0),
 Row(longitude=-114.57, latitude=33.64, housing_median_age=14.0, total_rooms=1501.0, total_bedrooms=337.0, population=515.0, households=226.0, median_income=3.1917, median_house_value=73400.0),
 Row(longitude=-114.57, latitude=33.57, housing_median_age=20.0, total_rooms=1454.0, total_bedrooms=326.0, population=624.0, households=262.0, median_income=1.925, median_house_value=65500.0)]

In [10]:
flightdata.take(2)

[Row(longitude=-114.31, latitude=34.19, housing_median_age=15.0, total_rooms=5612.0, total_bedrooms=1283.0, population=1015.0, households=472.0, median_income=1.4936, median_house_value=66900.0),
 Row(longitude=-114.47, latitude=34.4, housing_median_age=19.0, total_rooms=7650.0, total_bedrooms=1901.0, population=1129.0, households=463.0, median_income=1.82, median_house_value=80100.0)]

In [11]:
flightdata.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [12]:
from pyspark.sql.functions import col,expr

In [13]:
flightdata.select("longitude").show(3)

+---------+
|longitude|
+---------+
|  -114.31|
|  -114.47|
|  -114.56|
+---------+
only showing top 3 rows


In [14]:
flightdata.selectExpr("longitude as long").show(4)

+-------+
|   long|
+-------+
|-114.31|
|-114.47|
|-114.56|
|-114.57|
+-------+
only showing top 4 rows


In [15]:
flightdata.select(col("longitude").alias("ll")).show(3)

+-------+
|     ll|
+-------+
|-114.31|
|-114.47|
|-114.56|
+-------+
only showing top 3 rows


In [16]:
flightdata.withColumnRenamed("longitude","lof").show(3)

+-------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|    lof|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+-------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|-114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|-114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|-114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
+-------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows


In [17]:
flightdata.where("population > 1050").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|           82400.0|
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   33.62|              16.0|     3741.0|         801.0|    2434.0|     824.0|       2.6797|           86500.0|
|   -114.6|    33.6|              21.0|     1988.0|         483.0|    1182.0|     437.0|        1.625|           62000.0|
|  -114.61|   34.83|    

In [None]:
flightdata.where("population > 1050").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|           82400.0|
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   33.62|              16.0|     3741.0|         801.0|    2434.0|     824.0|       2.6797|           86500.0|
|   -114.6|    33.6|              21.0|     1988.0|         483.0|    1182.0|     437.0|        1.625|           62000.0|
|  -114.61|   34.83|    

In [None]:
flightdata.where("population > 1050").show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|           82400.0|
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   33.62|              16.0|     3741.0|         801.0|    2434.0|     824.0|       2.6797|           86500.0|
|   -114.6|    33.6|              21.0|     1988.0|         483.0|    1182.0|     437.0|        1.625|           62000.0|
|  -114.61|   34.83|    

In [18]:
flightdata.where(col("population") > 1050 ).where(col("total_rooms") > 500).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|           82400.0|
|  -114.59|   33.61|              34.0|     4789.0|        1175.0|    3134.0|    1056.0|       2.1782|           58400.0|
|   -114.6|   33.62|              16.0|     3741.0|         801.0|    2434.0|     824.0|       2.6797|           86500.0|
|   -114.6|    33.6|              21.0|     1988.0|         483.0|    1182.0|     437.0|        1.625|           62000.0|
|  -114.61|   34.83|    

In [21]:
df = flightdata.withColumn("extra",expr("population > households * 2 "))

In [22]:
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|extra|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0| true|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0| true|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0| true|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0| true|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|  

In [23]:
df_dropped = df.drop("total_rooms")

In [24]:
df_dropped.show()

+---------+--------+------------------+--------------+----------+----------+-------------+------------------+-----+
|longitude|latitude|housing_median_age|total_bedrooms|population|households|median_income|median_house_value|extra|
+---------+--------+------------------+--------------+----------+----------+-------------+------------------+-----+
|  -114.31|   34.19|              15.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0| true|
|  -114.47|    34.4|              19.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0| true|
|  -114.56|   33.69|              17.0|         174.0|     333.0|     117.0|       1.6509|           85700.0| true|
|  -114.57|   33.64|              14.0|         337.0|     515.0|     226.0|       3.1917|           73400.0| true|
|  -114.57|   33.57|              20.0|         326.0|     624.0|     262.0|        1.925|           65500.0| true|
|  -114.58|   33.63|              29.0|         236.0|     671.0|     23

In [27]:
df.select(col("longitude").alias("long")).show()

+-------+
|   long|
+-------+
|-114.31|
|-114.47|
|-114.56|
|-114.57|
|-114.57|
|-114.58|
|-114.58|
|-114.59|
|-114.59|
| -114.6|
| -114.6|
| -114.6|
|-114.61|
|-114.61|
|-114.63|
|-114.65|
|-114.65|
|-114.65|
|-114.66|
|-114.67|
+-------+
only showing top 20 rows


In [28]:
df.where(col("housing_median_age")>15).where(col("latitude") > 30).show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|extra|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-----+
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0| true|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0| true|
|  -114.57|   33.57|              20.0|     1454.0|         326.0|     624.0|     262.0|        1.925|           65500.0| true|
|  -114.58|   33.63|              29.0|     1387.0|         236.0|     671.0|     239.0|       3.3438|           74000.0| true|
|  -114.58|   33.61|              25.0|     2907.0|         680.0|    1841.0|     633.0|       2.6768|  