In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

from pyspark.sql import functions as F
from pyspark.sql import types as T 
from pyspark.sql import Window as W 

In [3]:
conf = SparkConf() \
    .setAppName("Example") \
    .setMaster("local") \
    .set("spark.driver.extraClassPath","C:/pyspark/*")

sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

In [4]:
TimeSeries_df = spark.read.options(delimiter=",", header=True).csv("TimeSeries.csv")
TimeSeries_df.show()

+--------+--------+-----+
|   Metro|    Date|Value|
+--------+--------+-----+
| Redmond|2/1/2024|    0|
| Redmond|3/1/2024|  0.1|
| Redmond|4/1/2024|  0.2|
| Redmond|5/1/2024|  0.3|
| Redmond|6/1/2024|  0.4|
| Redmond|7/1/2024| -0.1|
|Bellevue|3/1/2024|  0.7|
|Bellevue|4/1/2024|  0.7|
|Bellevue|5/1/2024|  0.8|
|Bellevue|6/1/2024|  0.9|
|Bellevue|7/1/2024|    1|
| Seattle|4/1/2024|  0.6|
| Seattle|5/1/2024|  0.7|
| Seattle|6/1/2024|  0.8|
| Seattle|7/1/2024|  0.7|
| Seattle|8/1/2024|  0.6|
+--------+--------+-----+



What do you notice when you see this data
- I find the value given as -0.1 and since th Value column denotes percentage of Dog ownership, it cant have a negative value
- The value -0.1 can be handled in many ways

In [5]:
MetroPropery_df = spark.read.options(delimiter=",", header=True).csv("MetroProperties.csv")
MetroPropery_df.show()

+--------+----------+------+
|   Metro|  Property| Value|
+--------+----------+------+
| Redmond|     State|    WA|
| Redmond|  Eastside|  TRUE|
| Redmond|Population| 70000|
| Redmond|     Parks|    47|
|Bellevue|     State|    WA|
|Bellevue|  Eastside|  TRUE|
|Bellevue|Population|154600|
|Bellevue|     Parks|  100+|
| Seattle|     State|    WA|
| Seattle|  Eastside| FALSE|
| Seattle|Population|750000|
| Seattle|     Parks|    87|
|    NULL|      NULL|  NULL|
+--------+----------+------+



What do you notice when you see this data 
- We have an additional null column, that can be removed. It makes no value
- The value column had mixed datatypes that can be handled too. 
- I see a value 100+ that has to be handled.

Coding Questions - 
Transform the MetroProperties dataframe such that the columns are: City, State, Population, Parks, Eastside


In [6]:
transformed_df = MetroPropery_df.groupBy("Metro") \
.pivot("Property") \
.agg(F.expr("coalesce(first(Value), '')")) \
.selectExpr("Metro as City", "State", "Population", "Parks", "Eastside")

transformed_df.show()

+--------+-----+----------+-----+--------+
|    City|State|Population|Parks|Eastside|
+--------+-----+----------+-----+--------+
|    NULL|     |          |     |        |
|Bellevue|   WA|    154600| 100+|    TRUE|
| Redmond|   WA|     70000|   47|    TRUE|
| Seattle|   WA|    750000|   87|   FALSE|
+--------+-----+----------+-----+--------+



In [7]:
transformed_df.columns

['City', 'State', 'Population', 'Parks', 'Eastside']

In [8]:
# Cleaning more
clean_df = transformed_df.na.drop()
clean_df = clean_df.withColumn("Parks", F.expr("CASE WHEN Parks = '100+' THEN 100 ELSE Parks END"))
clean_df.show()

+--------+-----+----------+-----+--------+
|    City|State|Population|Parks|Eastside|
+--------+-----+----------+-----+--------+
|Bellevue|   WA|    154600|  100|    TRUE|
| Redmond|   WA|     70000|   47|    TRUE|
| Seattle|   WA|    750000|   87|   FALSE|
+--------+-----+----------+-----+--------+



For each time series in TimeSeries.csv, I want to have data for all of: Jan 2024 to Aug 2024.  
Impute the missing values as follows:
Should be 0.0 on 2024-01-01, and then keep flat until the 1st nonzero value for each City

Do constant extrapolation after the last nonzero value in the time series until 2024-08-01. 

In [114]:
import pandas as pd
df = pd.read_csv("C:/Users/manit/Downloads/TimeSeries.csv")


In [115]:
df['Date'] = pd.to_datetime(df['Date'])
df

Unnamed: 0,Metro,Date,Value
0,Redmond,2024-02-01,0.0
1,Redmond,2024-03-01,0.1
2,Redmond,2024-04-01,0.2
3,Redmond,2024-05-01,0.3
4,Redmond,2024-06-01,0.4
5,Redmond,2024-07-01,-0.1
6,Bellevue,2024-03-01,0.7
7,Bellevue,2024-04-01,0.7
8,Bellevue,2024-05-01,0.8
9,Bellevue,2024-06-01,0.9


In [116]:
# Create a date range from January 2024 to August 2024
date_range = pd.date_range(start='2024-01-01', end='2024-08-01', freq='MS')
date_range

DatetimeIndex(['2024-01-01', '2024-02-01', '2024-03-01', '2024-04-01',
               '2024-05-01', '2024-06-01', '2024-07-01', '2024-08-01'],
              dtype='datetime64[ns]', freq='MS')

In [117]:
# Create an empty dataframe to store the interpolated data
interpolated_df = pd.DataFrame(columns=['Metro', 'Date', 'Value'])
interpolated_df

Unnamed: 0,Metro,Date,Value


In [119]:
# Iterate over each unique "Metro" in the dataframe
for metro, group in df.groupby('Metro'):
    # Filter the dataframe to get data for the current metro
    metro_data = group.copy()
    
    # Set the "Date" column as the index for easier interpolation
    metro_data.set_index('Date', inplace=True)

    # Fill missing values with zeros until the first nonzero value
    metro_data['Value'] = metro_data['Value'].fillna(0)
    metro_data['Value'] = metro_data['Value'].cummax()
    
    # Interpolate missing values for the months where data is not available
    metro_data = metro_data.reindex(date_range)

    # Perform constant extrapolation after the last nonzero value
    last_nonzero_index = metro_data.loc[metro_data['Value'] != 0].index[-1]
    last_nonzero_value = metro_data.loc[last_nonzero_index, 'Value']
    metro_data.loc[last_nonzero_index:, 'Value'] = last_nonzero_value
    
    # Reset the index to add "Date" as a column
    metro_data = metro_data.reset_index()
    
    # Add the "Metro" column back
    metro_data['Metro'] = metro
    
    # Append the interpolated data to the final dataframe
    interpolated_df = pd.concat([interpolated_df, metro_data], ignore_index=True)


  interpolated_df = pd.concat([interpolated_df, metro_data], ignore_index=True)


In [120]:
interpolated_df

Unnamed: 0,Metro,Date,Value,index
0,Bellevue,,,2024-01-01
1,Bellevue,,,2024-02-01
2,Bellevue,,0.7,2024-03-01
3,Bellevue,,0.7,2024-04-01
4,Bellevue,,0.8,2024-05-01
5,Bellevue,,0.9,2024-06-01
6,Bellevue,,1.0,2024-07-01
7,Bellevue,,,2024-08-01
8,Redmond,,,2024-01-01
9,Redmond,,0.0,2024-02-01


In [121]:
interpolated_df = interpolated_df.drop(columns=['Date'])
interpolated_df = interpolated_df.rename(columns={'index': 'Date'})
interpolated_df

Unnamed: 0,Metro,Value,Date
0,Bellevue,,2024-01-01
1,Bellevue,,2024-02-01
2,Bellevue,0.7,2024-03-01
3,Bellevue,0.7,2024-04-01
4,Bellevue,0.8,2024-05-01
5,Bellevue,0.9,2024-06-01
6,Bellevue,1.0,2024-07-01
7,Bellevue,,2024-08-01
8,Redmond,,2024-01-01
9,Redmond,0.0,2024-02-01
