## One time data transformation
In this notebook, we are going to transform the stations and weather data in such a way that they will be conformed to the redshift schema for their corresponding tables.

The preprocessed data will be saved back to S3 before getting loaded to Redshift.

In [1]:
import pyspark
import os

In [2]:
pyspark.__version__

'3.4.0'

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('data-transformer') \
        .config("spark.hadoop.fs.s3a.access.key", os.environ.get('AKIA6QRC5MVST34T6MCG'))\
        .config("spark.hadoop.fs.s3a.secret.key", os.environ.get('jp7r198gHam218dZPDzvGzz/nix4Yg/G/LOsPlAX'))\
        .getOrCreate()

23/06/25 14:37:27 WARN Utils: Your hostname, Jrs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.134 instead (on interface en0)
23/06/25 14:37:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/06/25 14:37:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
sc = spark.sparkContext
sc._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

'3.3.4'

###  1. Stations data

In [6]:
df_stations = spark.read.csv("s3a://hrc-de-data/raw/cycling-extras/stations.csv", inferSchema=True, header=True)

In [None]:
df_stations.take(2)

In [None]:
df_stations.printSchema()

In [None]:
from pyspark.sql import functions as F, types as T

In [None]:
# rename columns
stations= df_stations.withColumnRenamed('Station.Id', 'station_id') \
                        .withColumnRenamed('StationName', 'station_name') \
                        .withColumnRenamed('easting', 'easting') \
                        .withColumnRenamed('northing', 'northing') 

In [None]:
stations.show(5)

In [None]:
# count missing values in each column
stations.select([F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c) for c in stations.columns]
   ).show()

In [None]:
stations.write.parquet('s3a://hrc-de-data/processed/cycling-dimension/stations/', mode='overwrite')

### 2. Weather data

In [None]:
df_weather = spark.read.json("s3a://hrc-de-data/raw/cycling-extras/weather.json")

In [None]:
df_weather.take(2)

In [None]:
df_weather.printSchema()

In [None]:
# drop some columns that we won't need
weather= df_weather.drop('cloudcover', 'conditions', 'datetimeEpoch', 'description', 'dew', 'icon', 
                            'precipcover', 'preciptype', 'source', 'stations', 'sunriseEpoch', 'sunsetEpoch')

In [None]:
# transform datetime
weather= weather.withColumnRenamed('datetime', 'weather_date') 
weather= weather.withColumn('weather_date', weather.weather_date.cast(T.DateType()))

In [None]:
weather.printSchema()
print(len(weather.columns), 'columns')

In [None]:
weather.show(2)

In [None]:
# count missing values in windgust
missing_windgust= (
    weather.select(
        F.count(F.when(F.col('tzoffset').isNull() | F.isnan(F.col('tzoffset')), ''))
        .alias('missing_tzoffset'))
)
missing_windgust.show()

In [None]:
# count missing values in each column
cols= weather.columns
cols.remove('weather_date')
missing_values= weather.select([F.count(F.when(F.col(c).isNull() | F.isnan(c), c)).alias(c) for c in cols])

In [None]:
missing_values.show()

In [None]:
perc_missing_values= (
    weather.select([
        F.round(F.count(F.when(F.isnan(c) | F.col(c).isNull(), c))/F.count(F.lit(1)),2)
        .alias(c) for c in cols
    ])
)
perc_missing_values.show()

In [None]:
# drop columns where missing values are more than 70%

weather= weather.drop('precipprob', 'snow', 'snowdepth')

if 'severerisk' in weather.columns:
    weather= weather.drop('severerisk')

weather.columns

In [None]:
weather= weather.repartition(10)

weather.write.parquet('s3a://hrc-de-data/processed/cycling-dimension/weather/', mode='overwrite')