# Análisis clima provincias argentinas

## Fetch base de datos psql

In [196]:
import findspark

findspark.add_jars('/app/postgresql-42.1.4.jar')
findspark.init()

In [197]:
from pyspark.sql import SparkSession
spark = (
    SparkSession.builder
    .appName("pyspark-postgres")
    .config("spark.driver.memory", "512m")
    .config("spark.driver.cores", "1")
    .config("spark.executor.memory", "512m")
    .config("spark.executor.cores", "1")
    .config("spark.sql.shuffle.partitions", "2")
    .getOrCreate()
)

In [198]:
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres/weather") \
    .option("dbtable", "weather.weather_daily") \
    .option("user", "weather") \
    .option("password", "weather") \
    .option("driver", "org.postgresql.Driver") \
    .load()

In [199]:
df.printSchema()

root
 |-- dt: long (nullable = true)
 |-- province: string (nullable = true)
 |-- temp_max: double (nullable = true)
 |-- temp_min: double (nullable = true)
 |-- weather_main: string (nullable = true)
 |-- weather_desc: string (nullable = true)



In [200]:
df.show(10)

Py4JJavaError: An error occurred while calling o7910.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2053)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
from datetime import datetime
from pyspark.sql.functions import udf

def to_datetime(s):
    return datetime.fromtimestamp(s).strftime('%Y-%m-%d %h')

def get_year(s):
    return int(datetime.fromtimestamp(s).strftime("%Y"))

def get_month(s):
    return int(datetime.fromtimestamp(s).strftime("%m"))

def get_day(s):
    return int(datetime.fromtimestamp(s).strftime("%d"))

def get_hour(s):
    return int(datetime.fromtimestamp(s).strftime("%H"))

def get_season(s):
    doy = datetime.fromtimestamp(s).timetuple().tm_yday

    fall = range(80, 172)
    winter = range(172, 264)
    spring = range(264, 355)
    # summer = everything else

    if doy in spring:
      return 'spring'
    elif doy in winter:
      return 'winter'
    elif doy in fall:
      return 'fall'
    else:
      return 'summer'
    
to_datetime_udf = udf(to_datetime)
get_year_udf = udf(get_year)
get_month_udf = udf(get_month)
get_day_udf = udf(get_day)
get_hour_udf = udf(get_hour)
get_season_udf = udf(get_season)

from pyspark.sql.types import IntegerType

df = df.withColumn("datetime", to_datetime_udf(df.dt))
df = df.withColumn("year", get_year_udf(df.dt).cast(IntegerType()))
df = df.withColumn("month", get_month_udf(df.dt).cast(IntegerType()))
df = df.withColumn("day", get_day_udf(df.dt).cast(IntegerType()))
df = df.withColumn("hour", get_hour_udf(df.dt).cast(IntegerType()))
df = df.withColumn("season", get_season_udf(df.dt))

In [None]:
df.show(10)

In [None]:
from pyspark.sql.functions import desc
def print_province_weathers(record):
    print(record[0])
    df.filter(df.province == record[0]).groupBy('weather_main').count().orderBy(desc('count')).show(10)

In [None]:
for row in df.select("province").distinct().rdd.toLocalIterator():
    print_province_weathers(row)


In [None]:
from pyspark.ml.feature import StringIndexer
df.printSchema()
cat_cols = [ 'weather_main', 'weather_desc', 'province']
for cat in cat_cols:
    cat_suff = f'{cat}_num'
    if cat_suff not in df.columns:
        indexer = StringIndexer(inputCol=cat, outputCol=cat_suff).fit(df)
        df = indexer.transform(df)

# Hacer OneHotEnconding
from pyspark.ml.feature import OneHotEncoderEstimator

inputCols=[ 'weather_main_num', 'weather_desc_num']
outputCols=['weather_main_vec', 'weather_desc_vec']
encoder = OneHotEncoderEstimator(inputCols = inputCols,outputCols = outputCols )
ohem = encoder.fit(df)
df = ohem.transform(df)

# Armar columna de features
df = df.select('province_num','temp_max', 'temp_max' ,'year', 'month' , 'day', 'hour', 'weather_main_vec', 'weather_desc_vec')
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=[c for c in df.columns if c != 'province'], outputCol='features')
assembler.transform(df).show(10)


In [None]:
def _get_typed_cols(df, col_type='cat'):
    assert col_type in ('cat', 'num')
    dtypes = ('int', 'double') if col_type == 'num' else ('string')
    typed_cols = [c for c,dtype in df.dtypes if dtype.startswith(dtypes) 
                  and c not in PROTECTED_COLS]
    return typed_cols

In [None]:
PROTECTED_COLS = ['province', 'province_num']

In [None]:
num_cols = _get_typed_cols(df, col_type='num')
cat_cols = _get_typed_cols(df, col_type='cat')
df.columns, cat_cols, num_cols

In [None]:
from pyspark.ml.feature import StringIndexer

def _encode_categorical(df):
    cat_cols = _get_typed_cols(df, col_type='cat')
    print(f"Categorical columns:\n {cat_cols}")
    encoded_cols = []
    for cat in cat_cols:
        cat_suff = f'{cat}_num'
        encoded_cols.append(cat_suff)
        if cat_suff not in df.columns:
            indexer = StringIndexer(inputCol=cat, outputCol=cat_suff).fit(df)
            df = indexer.transform(df)
    return df, encoded_cols

In [None]:
df, encoded_cols = _encode_categorical(df)
df.show()

In [None]:
feature_cols = num_cols + encoded_cols
feature_cols

In [None]:
from pyspark.ml.feature import OneHotEncoderEstimator

ohe_cols = [f'{c}_vec' for c in encoded_cols]
encoder = OneHotEncoderEstimator(inputCols=encoded_cols, outputCols=ohe_cols)
ohem = encoder.fit(df)
df = ohem.transform(df)
df.show()
feature_cols = num_cols + ohe_cols

In [None]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=feature_cols, outputCol='features')
df = assembler.transform(df)
df.select('features').show()

In [None]:
from pyspark.sql.functions import rand, when
df = df.withColumn('train', when(rand(seed=1234) > 0.3, True).otherwise(False))

In [None]:
train_data = df.filter(f.col('train') == True).select('survived', 'features')
test_data = df.filter(f.col('train') == False).select('survived', 'features')
train_data.show()

In [None]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(labelCol='province_num', featuresCol='features')
lrm = lr.fit(train_data)

In [None]:
lrm.summary.accuracy, lrm.summary.areaUnderROC

In [None]:
spark.stop()