# Supervised Learning using Logistic Regression

## Objective
- Determine a process of building logistic regression model in big data

## Hypothesis
- Building a logistic regression model in big data requires formal definition of a target

## Procedure

In [1]:
# Run this cell
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
# Run this cell
!pip install awscli

Collecting awscli
  Downloading awscli-1.35.18-py3-none-any.whl.metadata (11 kB)
Collecting botocore==1.35.52 (from awscli)
  Downloading botocore-1.35.52-py3-none-any.whl.metadata (5.7 kB)
Collecting docutils<0.17,>=0.10 (from awscli)
  Downloading docutils-0.16-py2.py3-none-any.whl.metadata (2.7 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from awscli)
  Downloading s3transfer-0.10.3-py3-none-any.whl.metadata (1.7 kB)
Collecting colorama<0.4.7,>=0.2.5 (from awscli)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Collecting rsa<4.8,>=3.1.2 (from awscli)
  Downloading rsa-4.7.2-py3-none-any.whl.metadata (3.6 kB)
Collecting jmespath<2.0.0,>=0.7.1 (from botocore==1.35.52->awscli)
  Downloading jmespath-1.0.1-py3-none-any.whl.metadata (7.6 kB)
Downloading awscli-1.35.18-py3-none-any.whl (4.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.5/4.5 MB[0m [31m29.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.35.52-py3-none-any.whl (12.7 MB)


In [3]:
# Run this cell
import matplotlib.pyplot as plt
import numpy as np

In [4]:
# Run this cell
import findspark
findspark.init()
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master('local[*]') \
        .appName('Basics') \
        .getOrCreate()

print(spark.version)

3.5.3


In [5]:
# Run this cell
!mkdir raw_2178
!ls

raw_2178  sample_data


In [6]:
# Run this cell
!aws s3 cp --recursive --no-sign-request s3://openaq-data-archive/records/csv.gz/locationid=2178/ raw_2178

Completed 232 Bytes/~209.2 KiB (282 Bytes/s) with ~320 file(s) remaining (calculating...)Completed 802 Bytes/~215.8 KiB (963 Bytes/s) with ~330 file(s) remaining (calculating...)download: s3://openaq-data-archive/records/csv.gz/locationid=2178/year=2016/month=03/location-2178-20160307.csv.gz to raw_2178/year=2016/month=03/location-2178-20160307.csv.gz
Completed 802 Bytes/~218.6 KiB (963 Bytes/s) with ~333 file(s) remaining (calculating...)Completed 1.4 KiB/~218.6 KiB (1.7 KiB/s) with ~333 file(s) remaining (calculating...)    Completed 2.1 KiB/~218.6 KiB (2.5 KiB/s) with ~333 file(s) remaining (calculating...)    download: s3://openaq-data-archive/records/csv.gz/locationid=2178/year=2016/month=03/location-2178-20160311.csv.gz to raw_2178/year=2016/month=03/location-2178-20160311.csv.gz
Completed 2.1 KiB/~218.6 KiB (2.5 KiB/s) with ~332 file(s) remaining (calculating...)Completed 2.8 KiB/~225.5 KiB (3.3 KiB/s) with ~342 file(s) remaining (calculating...)Completed 3.0 KiB/~232.9 K

In [7]:
# Run this cell
df_2178 = spark.read.csv('/content/raw_2178/*/*/', inferSchema=True, header=True)
df_2178.show(5)

+-----------+----------+--------------+--------------------+-------+-----------+---------+-----+--------------------+
|location_id|sensors_id|      location|            datetime|    lat|        lon|parameter|units|               value|
+-----------+----------+--------------+--------------------+-------+-----------+---------+-----+--------------------+
|       2178|      3916|Del Norte-2178|2023-12-29T01:00:...|35.1353|-106.584702|      no2|  ppm|              0.0327|
|       2178|      3916|Del Norte-2178|2023-12-29T02:00:...|35.1353|-106.584702|      no2|  ppm|0.033100000000000004|
|       2178|      3916|Del Norte-2178|2023-12-29T03:00:...|35.1353|-106.584702|      no2|  ppm|              0.0329|
|       2178|      3916|Del Norte-2178|2023-12-29T04:00:...|35.1353|-106.584702|      no2|  ppm|              0.0298|
|       2178|      3916|Del Norte-2178|2023-12-29T05:00:...|35.1353|-106.584702|      no2|  ppm|              0.0167|
+-----------+----------+--------------+-----------------

In [8]:
# Run this cell
df_2178.count()

327652

In [9]:
# Run this cell
df_2178.printSchema()

root
 |-- location_id: string (nullable = true)
 |-- sensors_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- datetime: string (nullable = true)
 |-- lat: string (nullable = true)
 |-- lon: string (nullable = true)
 |-- parameter: string (nullable = true)
 |-- units: string (nullable = true)
 |-- value: string (nullable = true)



In [10]:
# Run this cell
df_2178.createOrReplaceTempView('df_2178_view')

In [11]:
# Run this cell
spark.sql('''
SELECT
  *
FROM
  df_2178_view
LIMIT
  5
''').show()

+-----------+----------+--------------+--------------------+-------+-----------+---------+-----+--------------------+
|location_id|sensors_id|      location|            datetime|    lat|        lon|parameter|units|               value|
+-----------+----------+--------------+--------------------+-------+-----------+---------+-----+--------------------+
|       2178|      3916|Del Norte-2178|2023-12-29T01:00:...|35.1353|-106.584702|      no2|  ppm|              0.0327|
|       2178|      3916|Del Norte-2178|2023-12-29T02:00:...|35.1353|-106.584702|      no2|  ppm|0.033100000000000004|
|       2178|      3916|Del Norte-2178|2023-12-29T03:00:...|35.1353|-106.584702|      no2|  ppm|              0.0329|
|       2178|      3916|Del Norte-2178|2023-12-29T04:00:...|35.1353|-106.584702|      no2|  ppm|              0.0298|
|       2178|      3916|Del Norte-2178|2023-12-29T05:00:...|35.1353|-106.584702|      no2|  ppm|              0.0167|
+-----------+----------+--------------+-----------------

In [12]:
# Run this cell
spark.sql('''
WITH
distinct_parameter AS (
SELECT
  DISTINCT parameter
FROM
  df_2178_view
)
SELECT
  *
FROM
  distinct_parameter
''').show()

+---------+
|parameter|
+---------+
|      so2|
|       co|
|      nox|
|       o3|
|     pm10|
|      no2|
|       no|
|     pm25|
+---------+



In [13]:
# Run this cell
spark.sql('''
WITH
year_month_day_split AS (
SELECT
  *
  ,YEAR(datetime) AS year
  ,LPAD(MONTH(datetime), 2, 0) AS month
  ,LPAD(DAY(datetime), 2, 0) AS day
  ,LPAD(WEEKOFYEAR(datetime), 2, 0) AS weeknum
FROM
  df_2178_view
)
,year_month_day_cte AS (
SELECT
  *
  ,CONCAT(year, '-', month) AS year_month
  ,CONCAT(year, '-', month, '-', day) AS year_month_day
  ,CONCAT(year, '-', weeknum) AS year_weeknum
FROM
  year_month_day_split
)
,monthly_average AS (
SELECT
  year_month
  ,AVG(CASE WHEN parameter IN ('so2') THEN value ELSE NULL END) AS so2_mo_avg
  ,AVG(CASE WHEN parameter IN ('co') THEN value ELSE NULL END) AS co_mo_avg
  ,AVG(CASE WHEN parameter IN ('nox') THEN value ELSE NULL END) AS nox_mo_avg
  ,AVG(CASE WHEN parameter IN ('o3') THEN value ELSE NULL END) AS o3_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm10') THEN value ELSE NULL END) AS pm10_mo_avg
  ,AVG(CASE WHEN parameter IN ('no2') THEN value ELSE NULL END) AS no2_mo_avg
  ,AVG(CASE WHEN parameter IN ('no') THEN value ELSE NULL END) AS no_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm25') THEN value ELSE NULL END) AS pm25_mo_avg
FROM
  year_month_day_cte
GROUP BY
  year_month
)
SELECT
  *
FROM
  monthly_average
LIMIT
  5
''').show()

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|
+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|   2024-09|3.687411598302689...|0.21450924608819347|0.005224550898203...| 0.04195977011494253|22.256373937677054| 0.00488752052545156|0.001014970059880...| 5.778470254957506|
|   2024-02|                 0.0| 0.2645264847512038|0.013666144200626952|0.030589062499999996|15.198198198198199|  0.0104687106918239|0.003308777429467...|3.6265765765765754|
|   2023-12|                 0.0| 0.2943641618497111|0.024935483870967733|0.019736918604651155| 15.52991452991453|0.0157

In [14]:
# Run this cell
df_mo = spark.sql('''
WITH
year_month_day_split AS (
SELECT
  *
  ,YEAR(datetime) AS year
  ,LPAD(MONTH(datetime), 2, 0) AS month
  ,LPAD(DAY(datetime), 2, 0) AS day
  ,LPAD(WEEKOFYEAR(datetime), 2, 0) AS weeknum
FROM
  df_2178_view
)
,year_month_day_cte AS (
SELECT
  *
  ,CONCAT(year, '-', month) AS year_month
  ,CONCAT(year, '-', month, '-', day) AS year_month_day
  ,CONCAT(year, '-', weeknum) AS year_weeknum
FROM
  year_month_day_split
)
,monthly_average AS (
SELECT
  year_month
  ,AVG(CASE WHEN parameter IN ('so2') THEN value ELSE NULL END) AS so2_mo_avg
  ,AVG(CASE WHEN parameter IN ('co') THEN value ELSE NULL END) AS co_mo_avg
  ,AVG(CASE WHEN parameter IN ('nox') THEN value ELSE NULL END) AS nox_mo_avg
  ,AVG(CASE WHEN parameter IN ('o3') THEN value ELSE NULL END) AS o3_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm10') THEN value ELSE NULL END) AS pm10_mo_avg
  ,AVG(CASE WHEN parameter IN ('no2') THEN value ELSE NULL END) AS no2_mo_avg
  ,AVG(CASE WHEN parameter IN ('no') THEN value ELSE NULL END) AS no_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm25') THEN value ELSE NULL END) AS pm25_mo_avg
FROM
  year_month_day_cte
GROUP BY
  year_month
)
SELECT
  *
FROM
  monthly_average
''')

In [15]:
# Run this cell
df_mo.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|
+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|   2024-09|3.687411598302689...|0.21450924608819347|0.005224550898203...| 0.04195977011494253|22.256373937677054| 0.00488752052545156|0.001014970059880...| 5.778470254957506|
|   2024-02|                 0.0| 0.2645264847512038|0.013666144200626952|0.030589062499999996|15.198198198198199|  0.0104687106918239|0.003308777429467...|3.6265765765765754|
|   2023-12|                 0.0| 0.2943641618497111|0.024935483870967733|0.019736918604651155| 15.52991452991453|0.0157

In [16]:
# Run this cell
df_mo_avg = spark.sql('''
WITH
year_month_day_split AS (
SELECT
  *
  ,YEAR(datetime) AS year
  ,LPAD(MONTH(datetime), 2, 0) AS month
  ,LPAD(DAY(datetime), 2, 0) AS day
  ,LPAD(WEEKOFYEAR(datetime), 2, 0) AS weeknum
FROM
  df_2178_view
)
,year_month_day_cte AS (
SELECT
  *
  ,CONCAT(year, '-', month) AS year_month
  ,CONCAT(year, '-', month, '-', day) AS year_month_day
  ,CONCAT(year, '-', weeknum) AS year_weeknum
FROM
  year_month_day_split
)
,monthly_average AS (
SELECT
  year_month
  ,AVG(CASE WHEN parameter IN ('so2') THEN value ELSE NULL END) AS so2_mo_avg
  ,AVG(CASE WHEN parameter IN ('co') THEN value ELSE NULL END) AS co_mo_avg
  ,AVG(CASE WHEN parameter IN ('nox') THEN value ELSE NULL END) AS nox_mo_avg
  ,AVG(CASE WHEN parameter IN ('o3') THEN value ELSE NULL END) AS o3_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm10') THEN value ELSE NULL END) AS pm10_mo_avg
  ,AVG(CASE WHEN parameter IN ('no2') THEN value ELSE NULL END) AS no2_mo_avg
  ,AVG(CASE WHEN parameter IN ('no') THEN value ELSE NULL END) AS no_mo_avg
  ,AVG(CASE WHEN parameter IN ('pm25') THEN value ELSE NULL END) AS pm25_mo_avg
FROM
  year_month_day_cte
GROUP BY
  year_month
)
,monthly_average_with_year_month_day1 AS (
SELECT
   *
   ,CONCAT(year_month, '-01') AS year_month_day1
FROM
  monthly_average
)
SELECT
  *
FROM
  monthly_average_with_year_month_day1
''')

In [17]:
# Run this cell
df_mo_avg.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|year_month_day1|
+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+
|   2024-09|3.687411598302689...|0.21450924608819347|0.005224550898203...| 0.04195977011494253|22.256373937677054| 0.00488752052545156|0.001014970059880...| 5.778470254957506|     2024-09-01|
|   2024-02|                 0.0| 0.2645264847512038|0.013666144200626952|0.030589062499999996|15.198198198198199|  0.0104687106918239|0.003308777429467...|3.6265765765765754|     2024-02-01|
|   2023-12|                 0.0| 0.2943

In [18]:
# Run this cell
df_mo_avg.summary().show()

+-------+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+
|summary|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|year_month_day1|
+-------+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+
|  count|       102|                 102|                 61|                  20|                 102|               102|                 102|                  20|                99|            102|
|   mean|      NULL|4.211717795191833E-4|0.23120599542858872|0.008325122699020847| 0.03548156235929927|21.006598592013425|0.008794360303114588|0.002065666775987...| 5.803252514888629|           NULL|


In [19]:
# Run this cell
imputer_input = [
    'so2_mo_avg'
    ,'co_mo_avg'
    ,'nox_mo_avg'
    ,'o3_mo_avg'
    ,'pm10_mo_avg'
    ,'no2_mo_avg'
    ,'no_mo_avg'
    ,'pm25_mo_avg'
  ]
imputer_output = [
     'so2_mo_avg_impute'
    ,'co_mo_avg_impute'
    ,'nox_mo_avg_impute'
    ,'o3_mo_avg_impute'
    ,'pm10_mo_avg_impute'
    ,'no2_mo_avg_impute'
    ,'no_mo_avg_impute'
    ,'pm25_mo_avg_impute'
  ]

assembler_input = imputer_output
assembler_output = 'assembler_output'

scaler_input = assembler_output
scaler_output = 'scaler_output'

In [20]:
# Run this cell
from pyspark.ml.feature import VectorAssembler, Imputer, StandardScaler
from pyspark.ml.pipeline import Pipeline

imputer = Imputer(inputCols=imputer_input, outputCols=imputer_output)
assembler = VectorAssembler().setInputCols(assembler_input).setOutputCol(assembler_output)
scaler = StandardScaler(inputCol=scaler_input, outputCol=scaler_output)

pipeline = Pipeline(stages=[imputer, assembler, scaler])

In [21]:
# Run this cell
pipeline_model = pipeline.fit(df_mo_avg)

In [22]:
# Run this cell
df_pipeline = pipeline_model.transform(df_mo_avg)
df_pipeline.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output|
+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+---------------

In [23]:
# Run this cell
from pyspark.ml.functions import vector_to_array

df_array = df_pipeline.withColumn('array_scaler_output', vector_to_array("scaler_output"))
df_array.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output| array_scaler_output|
+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------

In [24]:
# Run this cell
df_scale = df_array.withColumn('so2_mo_avg_scale', df_array['array_scaler_output'].getItem(0))\
.withColumn('co_mo_avg_scale', df_array['array_scaler_output'].getItem(1))\
.withColumn('nox_mo_avg_scale', df_array['array_scaler_output'].getItem(2))\
.withColumn('o3_mo_avg_scale', df_array['array_scaler_output'].getItem(3))\
.withColumn('pm10_mo_avg_scale', df_array['array_scaler_output'].getItem(4))\
.withColumn('no2_mo_avg_scale', df_array['array_scaler_output'].getItem(5))\
.withColumn('no_mo_avg_scale', df_array['array_scaler_output'].getItem(6))\
.withColumn('pm25_mo_avg_scale', df_array['array_scaler_output'].getItem(7))

df_scale.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+-------------------+------------------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|       pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output| array_scaler_output|  so2_mo_avg

In [25]:
# Run this cell
df_scale[[
    'co_mo_avg_impute', 'nox_mo_avg_impute', 'o3_mo_avg_impute', 'pm10_mo_avg_impute',
    'no2_mo_avg_impute', 'no_mo_avg_impute', 'pm25_mo_avg_impute'
    ]].summary().show()

+-------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|summary|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|
+-------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+
|  count|                102|                 102|                 102|               102|                 102|                 102|               102|
|   mean|0.23120599542858866|0.008325122699020856| 0.03548156235929927|21.006598592013425|0.008794360303114588|0.002065666775987...| 5.803252514888628|
| stddev|0.06977781214148915|0.002620837329690574|0.010050819280426905| 6.517247294211596|0.004528746019265971|0.001015009986187...|1.8984116949267547|
|    min|0.08643015521064301|0.003263969171483623| 0.01771223021582734|11.29613733905579

In [26]:
# Run this cell
df_scale.createOrReplaceTempView('df_scale_view')

In [27]:
# Run this cell
spark.sql('''
WITH
df_scale_raw AS (
SELECT
  *
FROM
  df_scale_view
)
,benchmark AS (
SELECT
  AVG(pm10_mo_avg_impute) AS avg_benchmark
FROM
  df_scale_raw
)
,target AS (
SELECT
  a.*
  ,CASE WHEN a.pm10_mo_avg_impute >= b.avg_benchmark THEN 1 ELSE 0 END AS target
FROM
  df_scale_raw AS a
CROSS JOIN
  benchmark AS b
)
SELECT
  *
FROM
  target
''').show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|      pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output| array_scaler_output|  so2_mo

In [28]:
# Run this cell
df_target = spark.sql('''
WITH
df_scale_raw AS (
SELECT
  *
FROM
  df_scale_view
)
,benchmark AS (
SELECT
  AVG(pm10_mo_avg_impute) AS avg_benchmark
FROM
  df_scale_raw
)
,target AS (
SELECT
  a.*
  ,CASE WHEN a.pm10_mo_avg_impute >= b.avg_benchmark THEN 1 ELSE 0 END AS target
FROM
  df_scale_raw AS a
CROSS JOIN
  benchmark AS b
)
SELECT
  *
FROM
  target
''')

In [29]:
# Run this cell
df_target.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|      pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output| array_scaler_output|  so2_mo

In [30]:
# Run this cell
# from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression

assembler = VectorAssembler(inputCols=['co_mo_avg_scale', 'nox_mo_avg_scale', 'o3_mo_avg_scale'], outputCol='features')
df_assembled = assembler.transform(df_target)

estimator = LogisticRegression(featuresCol='features', labelCol='target')

model = estimator.fit(df_assembled)

df_prediction = model.transform(df_assembled)

In [31]:
# Run this cell
df_prediction.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+--------------------+--------------------+--------------------+----------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|      pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impu

In [32]:
# Run this cell
# from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(df_prediction)

print('Accuracy: ', accuracy)
print('Coefficients: ', model.coefficients)
print('Intercept: ', model.intercept)

Accuracy:  0.5444906444906444
Coefficients:  [-0.18366279132438842,-0.4251758418083683,0.14593012533142258]
Intercept:  0.8321915942343122


In [33]:
# Run this cell
from pyspark.sql.types import *

df_prediction = df_prediction.withColumn('label', df_prediction['target'].cast(DoubleType()))
df_prediction.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+--------------------+--------------------+--------------------+----------+-----+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|      pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_av

In [34]:
# Run this cell
df_prediction.createOrReplaceTempView('df_prediction_view')

In [35]:
# Run this cell
spark.sql('''
SELECT
  label
  ,COUNT(label) AS cnt_label
FROM
  df_prediction_view
GROUP BY
  label
''').show()

+-----+---------+
|label|cnt_label|
+-----+---------+
|  0.0|       65|
|  1.0|       37|
+-----+---------+



In [36]:
# Run this cell
spark.sql('''
SELECT
  prediction
  ,COUNT(prediction) AS cnt_prediction
FROM
  df_prediction_view
GROUP BY
  prediction
''').show()

+----------+--------------+
|prediction|cnt_prediction|
+----------+--------------+
|       0.0|            94|
|       1.0|             8|
+----------+--------------+



In [37]:
# Run this cell
from pyspark.mllib.evaluation import MulticlassMetrics

prediction_and_label = df_prediction.select(['prediction','label'])
metrics = MulticlassMetrics(prediction_and_label.rdd.map(tuple))
print(metrics.confusionMatrix().toArray())



[[62.  3.]
 [32.  5.]]


## Observation and Analysis

Based on my observations, this procedure revealed the importance of data preparation, feature engineering, and model performance, and it has been able to identify the areas where logistic regression worked and the areas that need more refinement. Furthermore, I think that data imputation helped in handling missing values for key pollutant metrics; this ensured that even though some values were missing, each pollutant's monthly average was represented. The procedure then scaled those imputed features onto a similar range so that every metric could have equal contribution without being overpowered by any of the pollutant scales. This preprocessing was necessary in getting the data ready for logistic regression, sensitive to feature scale.

In this procedure, it set the target variable equal to an indicator of if it was in excess of that average during a given month so the instances could always be assigned as either a "true" or false with target values of 1 or 0 based on being over or at benchmark, thus exactly what this model could be trained and tuned with, since that's all logistic regression delivers, determining whether months will have the higher or smaller values for `pm10`. From the confusion matrix, the better classification was found with the "no exceedance," while "exceedance" could not be well classified with high accuracy and showed a limitation either of the model or of feature for the classification of patterns of exceedance.

Examining the procedure, I noticed a severe class imbalance condition because it has a considerable no exceedance against many exceedance. This class distribution most certainly would have declined the performance since logistic regressions are easily skewed and biased towards majorities and, in any case, the coefficients the model provide useful in pointing those who should have a good or possibly stronger relationship on their relations with the existence of probabilities of `pm10` exceeding a certain threshold-even such relationships might perhaps too oversimplified for complete representation and capture by logistic regression.

## Conclusion

The process effectively demonstrated how to construct a logistic regression model that predicts `pm10` exceedance, meeting the objective of building a baseline model and offering insights into relationships between pollutants. However, while the model gave some useful indications, the low accuracy suggests that more improvements could have been achieved by using more complex modeling approaches. I think that maybe logistic regression is too simple to handle the complexity of data. Therefore, checking other models could perform better and handle non-linear relationships in the data. Moreover, I think some techniques or adjusting class weights might help to reduce the effects of class imbalance.

Therefore, I conclude that, this more refined alternative of the model of logistic regression with another model and additional feature engineering would most certainly yield stronger predictions and more profound comprehension regarding the structural properties of the data and pollutant dynamics.

# *Required Challenge*

1. Transform `o3_mo_avg_impute` as target for logistic regression modeling. Apply the rules below:
- If value >= average, assign 1
- If value < average, assign 0

2. Use the following variables below as input to the predictive model:
- `co_mo_avg_scale`
- `nox_mo_avg_scale`
- `pm10_mo_avg_scale`


In [47]:
# Note: Executed the required challenge and procedure with assistance to online tools available

# This is the SQL view for easier querying
df_scale.createOrReplaceTempView('df_scale_view')

# Calculating average for o3_mo_avg_impute and create target column
df_target_o3 = spark.sql('''
WITH
df_raw AS (
    SELECT *
    FROM df_scale_view
),
benchmark AS (
    SELECT
        AVG(o3_mo_avg_impute) AS avg_o3_benchmark
    FROM
        df_raw
)
SELECT
    df_raw.*,
    CASE
        WHEN df_raw.o3_mo_avg_impute >= benchmark.avg_o3_benchmark THEN 1
        ELSE 0
    END AS target
FROM
    df_raw
CROSS JOIN
    benchmark
''')

# Displays the transformed target data
df_target_o3.show(5)

+----------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+-----------------+---------------+--------------------+-------------------+--------------------+--------------------+------------------+--------------------+--------------------+------------------+--------------------+--------------------+--------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+------+
|year_month|          so2_mo_avg|          co_mo_avg|          nox_mo_avg|           o3_mo_avg|       pm10_mo_avg|          no2_mo_avg|           no_mo_avg|      pm25_mo_avg|year_month_day1|   so2_mo_avg_impute|   co_mo_avg_impute|   nox_mo_avg_impute|    o3_mo_avg_impute|pm10_mo_avg_impute|   no2_mo_avg_impute|    no_mo_avg_impute|pm25_mo_avg_impute|    assembler_output|       scaler_output| array_scaler_output|  so2_mo

In [39]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression

# Defining the input features for VectorAssembler
assembler = VectorAssembler(inputCols=['co_mo_avg_scale', 'nox_mo_avg_scale', 'pm10_mo_avg_scale'], outputCol='features')

# Transforming data using the assembler
df_model_input = assembler.transform(df_target_o3).select('features', 'target')
df_model_input.show(5)

+--------------------+------+
|            features|target|
+--------------------+------+
|[2.20200073540904...|     1|
|[3.07417557967038...|     1|
|[3.31346008613411...|     0|
|[3.31346008613411...|     1|
|[3.31346008613411...|     1|
+--------------------+------+
only showing top 5 rows



In [40]:
# Initializing the logistic regression or the model
lr = LogisticRegression(labelCol='target', featuresCol='features')

# Training the model
lr_model = lr.fit(df_model_input)

# Print model coefficients and intercept
print("Coefficients:", lr_model.coefficients)
print("Intercept:", lr_model.intercept)

Coefficients: [-0.8585465353712822,-7.663934458977952,0.4359472599117276]
Intercept: 25.810045980586192


In [41]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

# This creates predictions on the training dataset
df_predictions = lr_model.transform(df_model_input)

# This evaluates the model accuracy
evaluator = BinaryClassificationEvaluator(labelCol='target', rawPredictionCol='prediction')
accuracy = evaluator.evaluate(df_predictions)
print("Model Accuracy:", accuracy)

Model Accuracy: 0.687111801242236


In [42]:
# This is an SQL to view for predictions to analyze prediction counts
df_predictions.createOrReplaceTempView('df_predictions_view')

# Count of each prediction class
spark.sql('''
SELECT
    prediction,
    COUNT(prediction) AS count_prediction
FROM
    df_predictions_view
GROUP BY
    prediction
''').show()

# Count of each target class
spark.sql('''
SELECT
    target,
    COUNT(target) AS count_target
FROM
    df_predictions_view
GROUP BY
    target
''').show()

+----------+----------------+
|prediction|count_prediction|
+----------+----------------+
|       0.0|              50|
|       1.0|              52|
+----------+----------------+

+------+------------+
|target|count_target|
+------+------------+
|     1|          56|
|     0|          46|
+------+------------+



In [45]:
from pyspark.sql.types import DoubleType
from pyspark.mllib.evaluation import MulticlassMetrics

# Casting both `prediction` and `target` columns to DoubleType
df_predictions = df_predictions.withColumn('prediction', df_predictions['prediction'].cast(DoubleType()))
df_predictions = df_predictions.withColumn('target', df_predictions['target'].cast(DoubleType()))

# Converting predictions to an RDD of tuples (prediction, target) for confusion matrix
prediction_and_labels = df_predictions.select(['prediction', 'target']).rdd.map(tuple)

# Calculating and printing the confusion matrix
metrics = MulticlassMetrics(prediction_and_labels)
print("Confusion Matrix:\n", metrics.confusionMatrix().toArray())

Confusion Matrix:
 [[32. 14.]
 [18. 38.]]
