In [1]:
pip install pyspark



In [2]:
from pyspark.sql import SparkSession

# initialize SparkSession
spark = SparkSession.builder \
    .appName("ETL Pipeline") \
    .getOrCreate()

In [3]:
file_path = "/content/sample_data/temperature.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# display the schema and preview the data
df.printSchema()
df.show(5)

root
 |-- ObjectId: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- ISO2: string (nullable = true)
 |-- ISO3: string (nullable = true)
 |-- F1961: double (nullable = true)
 |-- F1962: double (nullable = true)
 |-- F1963: double (nullable = true)
 |-- F1964: double (nullable = true)
 |-- F1965: double (nullable = true)
 |-- F1966: double (nullable = true)
 |-- F1967: double (nullable = true)
 |-- F1968: double (nullable = true)
 |-- F1969: double (nullable = true)
 |-- F1970: double (nullable = true)
 |-- F1971: double (nullable = true)
 |-- F1972: double (nullable = true)
 |-- F1973: double (nullable = true)
 |-- F1974: double (nullable = true)
 |-- F1975: double (nullable = true)
 |-- F1976: double (nullable = true)
 |-- F1977: double (nullable = true)
 |-- F1978: double (nullable = true)
 |-- F1979: double (nullable = true)
 |-- F1980: double (nullable = true)
 |-- F1981: double (nullable = true)
 |-- F1982: double (nullable = true)
 |-- F1983: double (nullable 

In [4]:
# fill missing values for country codes
df = df.fillna({"ISO2": "Unknown"})

# drop rows where all temperature values are null
temperature_columns = [col for col in df.columns if col.startswith('F')]
df = df.dropna(subset=temperature_columns, how="all")

In [5]:
from pyspark.sql.functions import expr

# reshape temperature data to have 'Year' and 'Temperature' columns
df_pivot = df.selectExpr(
    "ObjectId", "Country", "ISO3",
    "stack(62, " +
    ",".join([f"'F{1961 + i}', F{1961 + i}" for i in range(62)]) +
    ") as (Year, Temperature)"
)

# convert 'Year' column to integer
df_pivot = df_pivot.withColumn("Year", expr("int(substring(Year, 2, 4))"))
df_pivot.show(5)

+--------+--------------------+----+----+-----------+
|ObjectId|             Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
|       1|Afghanistan, Isla...| AFG|1961|     -0.113|
|       1|Afghanistan, Isla...| AFG|1962|     -0.164|
|       1|Afghanistan, Isla...| AFG|1963|      0.847|
|       1|Afghanistan, Isla...| AFG|1964|     -0.764|
|       1|Afghanistan, Isla...| AFG|1965|     -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows



In [6]:
output_path = "/processed_temperature.parquet"
df_pivot.write.mode("overwrite").parquet(output_path)

In [7]:
processed_df = spark.read.parquet(output_path)
processed_df.show(5)

+--------+--------------------+----+----+-----------+
|ObjectId|             Country|ISO3|Year|Temperature|
+--------+--------------------+----+----+-----------+
|       1|Afghanistan, Isla...| AFG|1961|     -0.113|
|       1|Afghanistan, Isla...| AFG|1962|     -0.164|
|       1|Afghanistan, Isla...| AFG|1963|      0.847|
|       1|Afghanistan, Isla...| AFG|1964|     -0.764|
|       1|Afghanistan, Isla...| AFG|1965|     -0.244|
+--------+--------------------+----+----+-----------+
only showing top 5 rows

