# **NoteBook 1**

### _1. Reading Data From Table_

In [4]:
df = spark.read.table("temperature_table")

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 6, Finished, Available, Finished)

In [8]:
df.select('Area_Code',
 'Area',
 'Months_Code',
 'Months',
 'Element_Code',
 'Element',
 'Unit',
 'Y1961').show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 10, Finished, Available, Finished)

+---------+-----------+-----------+--------+------------+------------------+----+------+
|Area_Code|       Area|Months_Code|  Months|Element_Code|           Element|Unit| Y1961|
+---------+-----------+-----------+--------+------------+------------------+----+------+
|        2|Afghanistan|       7001| January|        7271|Temperature change|  �C| 0.777|
|        2|Afghanistan|       7001| January|        6078|Standard Deviation|  �C|  1.95|
|        2|Afghanistan|       7002|February|        7271|Temperature change|  �C|-1.743|
|        2|Afghanistan|       7002|February|        6078|Standard Deviation|  �C| 2.597|
|        2|Afghanistan|       7003|   March|        7271|Temperature change|  �C| 0.516|
|        2|Afghanistan|       7003|   March|        6078|Standard Deviation|  �C| 1.512|
|        2|Afghanistan|       7004|   April|        7271|Temperature change|  �C|-1.709|
|        2|Afghanistan|       7004|   April|        6078|Standard Deviation|  �C| 1.406|
|        2|Afghanista

---

## _2. Data Cleaning and Transforming_

In [11]:
# Replacing Bad character Value

from pyspark.sql.functions import regexp_replace, col, trim

df = df.withColumn(
    "Unit",
    trim(regexp_replace(col("Unit"), "�C", "degree celsius"))  
)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 13, Finished, Available, Finished)

In [10]:
df.select('Area_Code',
 'Area',
 'Months_Code',
 'Months',
 'Element_Code',
 'Element',
 'Unit',
 'Y1961').show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 12, Finished, Available, Finished)

+---------+-----------+-----------+--------+------------+------------------+--------------+------+
|Area_Code|       Area|Months_Code|  Months|Element_Code|           Element|          Unit| Y1961|
+---------+-----------+-----------+--------+------------+------------------+--------------+------+
|        2|Afghanistan|       7001| January|        7271|Temperature change|degree celsius| 0.777|
|        2|Afghanistan|       7001| January|        6078|Standard Deviation|degree celsius|  1.95|
|        2|Afghanistan|       7002|February|        7271|Temperature change|degree celsius|-1.743|
|        2|Afghanistan|       7002|February|        6078|Standard Deviation|degree celsius| 2.597|
|        2|Afghanistan|       7003|   March|        7271|Temperature change|degree celsius| 0.516|
|        2|Afghanistan|       7003|   March|        6078|Standard Deviation|degree celsius| 1.512|
|        2|Afghanistan|       7004|   April|        7271|Temperature change|degree celsius|-1.709|
|        2

In [12]:
# Handling Nulls and duplicates

from pyspark.sql.functions import col, sum

df = df.na.drop()
df = df.dropDuplicates()
df = df.drop("name")

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 14, Finished, Available, Finished)

In [13]:
df.select('Area_Code',
 'Area',
 'Months_Code',
 'Months',
 'Element_Code',
 'Element',
 'Unit',
 'Y1961').show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 15, Finished, Available, Finished)

+---------+--------------------+-----------+-----------+------------+------------------+--------------+------+
|Area_Code|                Area|Months_Code|     Months|Element_Code|           Element|          Unit| Y1961|
+---------+--------------------+-----------+-----------+------------+------------------+--------------+------+
|        4|             Algeria|       7012|   December|        6078|Standard Deviation|degree celsius| 1.124|
|       23|              Belize|       7017|Mar�Apr�May|        6078|Standard Deviation|degree celsius| 0.682|
|      259|     Channel Islands|       7006|       June|        6078|Standard Deviation|degree celsius| 1.117|
|       44|            Colombia|       7010|    October|        6078|Standard Deviation|degree celsius|  0.31|
|       74|               Gabon|       7002|   February|        6078|Standard Deviation|degree celsius| 0.387|
|      175|       Guinea-Bissau|       7012|   December|        6078|Standard Deviation|degree celsius| 0.527|
|

In [16]:
# Checking data types of all column whether they are correct or not
df.dtypes

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 18, Finished, Available, Finished)

[('Area_Code', 'int'),
 ('Area', 'string'),
 ('Months_Code', 'int'),
 ('Months', 'string'),
 ('Element_Code', 'int'),
 ('Element', 'string'),
 ('Unit', 'string'),
 ('Y1961', 'double'),
 ('Y1962', 'double'),
 ('Y1963', 'double'),
 ('Y1964', 'double'),
 ('Y1965', 'double'),
 ('Y1966', 'double'),
 ('Y1967', 'double'),
 ('Y1968', 'double'),
 ('Y1969', 'double'),
 ('Y1970', 'double'),
 ('Y1971', 'double'),
 ('Y1972', 'double'),
 ('Y1973', 'double'),
 ('Y1974', 'double'),
 ('Y1975', 'double'),
 ('Y1976', 'double'),
 ('Y1977', 'double'),
 ('Y1978', 'double'),
 ('Y1979', 'double'),
 ('Y1980', 'double'),
 ('Y1981', 'double'),
 ('Y1982', 'double'),
 ('Y1983', 'double'),
 ('Y1984', 'double'),
 ('Y1985', 'double'),
 ('Y1986', 'double'),
 ('Y1987', 'double'),
 ('Y1988', 'double'),
 ('Y1989', 'double'),
 ('Y1990', 'double'),
 ('Y1991', 'double'),
 ('Y1992', 'double'),
 ('Y1993', 'double'),
 ('Y1994', 'double'),
 ('Y1995', 'double'),
 ('Y1996', 'double'),
 ('Y1997', 'double'),
 ('Y1998', 'double'),
 

In [17]:
# Replacing invalid month values with "unknown"

from pyspark.sql.functions import when, lower, col

valid_months = [
    "january", "february", "march", "april", "may", "june",
    "july", "august", "september", "october", "november", "december"
]

df = df.withColumn(
    "Months",
    when(lower(col("Months")).isin(valid_months), col("Months")) 
    .otherwise("Unknown")  
)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 19, Finished, Available, Finished)

In [18]:
df.select('Area_Code',
 'Area',
 'Months_Code',
 'Months',
 'Element_Code',
 'Element',
 'Unit',
 'Y1961').show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 20, Finished, Available, Finished)

+---------+--------------------+-----------+---------+------------+------------------+--------------+------+
|Area_Code|                Area|Months_Code|   Months|Element_Code|           Element|          Unit| Y1961|
+---------+--------------------+-----------+---------+------------+------------------+--------------+------+
|        4|             Algeria|       7012| December|        6078|Standard Deviation|degree celsius| 1.124|
|       23|              Belize|       7017|  Unknown|        6078|Standard Deviation|degree celsius| 0.682|
|      259|     Channel Islands|       7006|     June|        6078|Standard Deviation|degree celsius| 1.117|
|       44|            Colombia|       7010|  October|        6078|Standard Deviation|degree celsius|  0.31|
|       74|               Gabon|       7002| February|        6078|Standard Deviation|degree celsius| 0.387|
|      175|       Guinea-Bissau|       7012| December|        6078|Standard Deviation|degree celsius| 0.527|
|       94|        

In [19]:
# Replacing Bad value cells with "unknown"

from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType

def clean_non_ascii(val):
    if val is None:
        return None
    try:
        return val if val.isascii() else "Unknown"
    except Exception:
        return "Unknown"

clean_non_ascii_udf = udf(clean_non_ascii, StringType())

string_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]

for c in string_cols:
    df = df.withColumn(c, clean_non_ascii_udf(col(c)))

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 21, Finished, Available, Finished)

In [20]:
df.select('Area_Code',
 'Area',
 'Months_Code',
 'Months',
 'Element_Code',
 'Element',
 'Unit',
 'Y1961').show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 22, Finished, Available, Finished)

+---------+--------------------+-----------+---------+------------+------------------+--------------+------+
|Area_Code|                Area|Months_Code|   Months|Element_Code|           Element|          Unit| Y1961|
+---------+--------------------+-----------+---------+------------+------------------+--------------+------+
|        4|             Algeria|       7012| December|        6078|Standard Deviation|degree celsius| 1.124|
|       23|              Belize|       7017|  Unknown|        6078|Standard Deviation|degree celsius| 0.682|
|      259|     Channel Islands|       7006|     June|        6078|Standard Deviation|degree celsius| 1.117|
|       44|            Colombia|       7010|  October|        6078|Standard Deviation|degree celsius|  0.31|
|       74|               Gabon|       7002| February|        6078|Standard Deviation|degree celsius| 0.387|
|      175|       Guinea-Bissau|       7012| December|        6078|Standard Deviation|degree celsius| 0.527|
|       94|        

----

## _3. Saving back to table_

In [23]:
df.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("temperature_table")

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 25, Finished, Available, Finished)

-----

## _4. Using SparkSql for querying_

In [31]:
# Finding Temperature for India in the October, Novemeber in 2019
query1 = """SELECT 
                Area,
                Months,
                Y2019
            FROM 
                temperature_table 
            WHERE 
                Area = 'India' AND Months IN ('October', 'November');"""

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 33, Finished, Available, Finished)

In [32]:
spark.sql(query1).show(10)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 34, Finished, Available, Finished)

+-----+--------+-----+
| Area|  Months|Y2019|
+-----+--------+-----+
|India| October|0.538|
|India|November|1.389|
|India|November|0.764|
|India| October|0.267|
+-----+--------+-----+



In [38]:
# Find Which Country Had the Highest Value in 2019
query2 = """SELECT 
                Area, 
                Y2019
            FROM 
                temperature_table
            ORDER BY 
                Y2019 DESC
            LIMIT
                1;
"""

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 40, Finished, Available, Finished)

In [40]:
spark.sql(query2).show(10, truncate=False)

StatementMeta(, b14fb5c5-babb-44f4-a42b-b48f7fbcd8d2, 42, Finished, Available, Finished)

+------------------------------+-----+
|Area                          |Y2019|
+------------------------------+-----+
|Svalbard and Jan Mayen Islands|7.215|
+------------------------------+-----+



----