In [None]:
import os
os.environ["SPARK_HOME"] = path_home + "spark-3.3.2-bin-hadoop3"
os.environ["JAVA_HOME"] = path_home + "jre1.8.0_361"
import findspark
findspark.init()

In [None]:
import pandas as pd
import numpy as np
from datetime import date, timedelta, datetime
import time
import logging

In [None]:
import pyspark # only run this after findspark.init()
from pyspark.sql import SparkSession, SQLContext
from pyspark.context import SparkContext
from pyspark.sql.functions import * 
from pyspark.sql.types import * 

In [None]:
spark = SparkSession.builder.appName('diabetes-test').getOrCreate()

In [None]:
diabetes_cases = spark.read.load(path_home + "data/kaggle_diabetes_dataset/diabetes.csv",
                        format="csv", 
                        sep=",", 
                        inferSchema="true", 
                        header="true")

In [None]:
diabetes_cases.limit(10).toPandas()

### simple pyspark RDD functions

In [None]:
# rename columns
diabetes_cases = diabetes_cases.toDF(*['times_pregnancies', 'glucose_concentration', 'diastolic_bloodpressure',
                                        'skin_thickness', 'insulin_rate', 'bmi', 'diabetes_pedigree', 'age', 'outcome'])


In [None]:
# Descending Sort
from pyspark.sql import functions as F
diabetes_cases.sort(F.desc("Age")).show()

In [None]:
# get old people with normal glucose concentration
diabetes_cases.filter((diabetes_cases.age>60) & (diabetes_cases.glucose_concentration>=70) & (diabetes_cases.glucose_concentration<=100)).show()

In [None]:
#groupby
from pyspark.sql import functions as F
diabetes_cases.groupBy(["Age"]).agg(F.sum("times_pregnancies")).sort(F.desc("Age")).show()

In [None]:
datestamp = spark.sql("""
  SELECT TO_DATE(CAST(UNIX_TIMESTAMP() AS TIMESTAMP)) AS newdate"""
)

datestamp.show()

In [None]:

### Get Year from date in pyspark
 
from pyspark.sql.functions import *

df1_test = diabetes_cases.withColumn("year", current_date())
df1_test.show()

### Embed above experimental code into pragmatic ETL.

In [None]:
cluster_up = True

def spark_session():
    return SparkSession.builder.appName('diabetes-spark-processor').getOrCreate()

def extract_dataset(dataset) -> object:
    if os.environ['SPARK_HOME']:
        try:
            return spark.read.load(dataset,
                            format="csv", 
                            sep=",", 
                            inferSchema="true", 
                            header="true")
        except Exception as err:
            logging.fatal(err)

def rename_columns(dataset):
    return dataset.toDF(*['times_pregnancies', 'glucose_concentration', 'diastolic_bloodpressure',
                                        'skin_thickness', 'insulin_rate', 'bmi', 'diabetes_pedigree', 'age', 'outcome'])

def add_date(dataset):
    return diabetes_cases.withColumn("date", current_date())

def transform_dataset(dataset):
    if cluster_up:
        return rename_columns(dataset), add_date(dataset)
    
def load_dataset(dataset):
    import boto3_mocking
    with boto3_mocking.resources.handler_for('s3'):
        return dataset.write.parquet()("s3a://sparkbyexamples/parquet/people.parquet")

    