## Importaciones

In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

## Apache SPARK y MapReduce

#### Configuraciones SparkSession:
##### - Cantidad de memoria a asignar a los workers
##### - Nombre de la aplicación
##### - Lugar donde está el servidor master

In [None]:
spark = SparkSession\
.builder\
.config("spark.submit.deployMode", "client")\
.config("spark.executor.instances", "1")\
.config("spark.executor.memory", "1g")\
.config("spark.driver.memory", "1g")\
.config("spark.executor.memoryOverhead", "1g")\
.appName("procesandoEgresos2021")\
.master("spark://sparkmaster:7077")\
.getOrCreate()

spark.sparkContext._conf.getAll()

In [None]:
df_girasol = spark.read.csv('hdfs://namenode:9000/cursoFAI/girasol-serie-1969-2019.csv', header=True, inferSchema=True) 

#### Función que mapea por provincia el rendimiento en kg por hectárea y reduce con la función a la suma de los rendimientos históricos por provincia.

<img src="./1.png" style="width:500px">

In [None]:
def mapeoProvinciaRendimiento(df_girasol):
     
    mapeo =  df_girasol.rdd.map(lambda x: (x['provincia_nombre'], x['rendimiento_kgxha']))
    
    resultado = mapeo.reduceByKey(lambda x,y: round(x+y,2) ).sortByKey()
    
    print('<<< Resultado del Reduce >>>')
    print(resultado.collect())
    return resultado

dataProvinciaRendimiento = mapeoProvinciaRendimiento(df_girasol)

#### Calculamos el año record y el peor de producción de girasol (toneladas) en Argentina (1969-2019)

<img src="./2.png" style="width:500px">

In [None]:
def anioRecord(df_girasol):
    
    mapeo =  df_girasol.rdd.map(lambda x: (x['anio'], x['produccion_tm']))
    
    resultado = mapeo.reduceByKey(lambda x,y: round(x+y,2) ).sortBy(lambda x: x[1])
        
    print('<<< Resultado del Reduce >>>')
    print("Peor año = ", resultado.first())
    print("Año récord = ", resultado.max(lambda x: x[1]))
    return resultado.first(), resultado.max(lambda x: x[1])
    
peorAnio = anioRecord(df_girasol)

#### Mostramos gráficamente los resultados

In [None]:
# Obtenemos los datos para graficar
kvRDD1 = spark.sparkContext.parallelize(dataProvinciaRendimiento.collect())
kvRDD1 = kvRDD1.sortBy(lambda x: x[1])

provincias = kvRDD1.keys().collect()
rendimientos = kvRDD1.values().collect()
 
# Figure Size
fig, ax = plt.subplots(figsize =(16, 9))
 
# Horizontal Bar Plot
ax.barh(provincias, rendimientos)
 
# Remove axes splines
for s in ['top', 'bottom', 'left', 'right']:
    ax.spines[s].set_visible(False)

# Remove x, y Ticks
ax.xaxis.set_ticks_position('none')
ax.yaxis.set_ticks_position('none')
 
# Add padding between axes and labels
ax.xaxis.set_tick_params(pad = 5)
ax.yaxis.set_tick_params(pad = 10)
 
# Add x, y gridlines
ax.grid(visible = True, color ='grey',
        linestyle ='-.', linewidth = 0.5,
        alpha = 0.2)
 
# Add annotation to bars
for i in ax.patches:
    plt.text(i.get_width()+0.2, i.get_y()+0.5,
             str(round((i.get_width()), 2)),
             fontsize = 11, fontweight ='bold',
             color ='grey')

# Add Plot Title
ax.set_title('Rendimiento Histórico',
             loc ='center', )
 
# Show Plot
plt.show()

#### Para el peor año de producción, calculamos el porcentaje de cosecha en base a la siembra por provincia

In [None]:
df_provincias = df_girasol.select('provincia_nombre').distinct()

# Filtramos por peor año y nos quedamos con las columnas que nos interesan
df_aux = df_girasol.filter('anio = "'+str(peorAnio[0][0])+'"')
df_aux = df_aux[['provincia_nombre', 'superficie_sembrada_ha', 'superficie_cosechada_ha']]

mapeo_siembra =  df_aux.rdd.map(lambda x: (x['provincia_nombre'], x['superficie_sembrada_ha']))
resultado_siembra = mapeo_siembra.reduceByKey(lambda x,y: round(x+y,2) ).sortByKey().collect()

mapeo_cosecha =  df_aux.rdd.map(lambda x: (x['provincia_nombre'], x['superficie_cosechada_ha']))
resultado_cosecha = mapeo_cosecha.reduceByKey(lambda x,y: round(x+y,2) ).sortByKey().collect()

porcentajes = []

for i in range(len(resultado_siembra)):
    porcentajes.append((resultado_cosecha[i][1] * 100) / resultado_siembra[i][1])
    
df = pd.DataFrame(resultado_siembra, columns=["provincia", "siembra"])
df["porcentaje"] = porcentajes
df = df[['provincia', 'porcentaje']]

In [None]:
import numpy as np

ax = df.plot.bar(x='provincia', y='porcentaje', rot=50,  figsize=(9,7))
ax.get_legend().remove()
ax.set_title('Porcentaje de cosecha en base a la siembra del peor año')
ax.set_xlabel('Provincias')
ax.set_ylabel('Porcentaje')

#### Para el mejor año de producción, calculamos el porcentaje de cosecha en base a la siembra por provincia

In [None]:
df_provincias = df_girasol.select('provincia_nombre').distinct()

# Filtramos por mejor año y nos quedamos con las columnas que nos interesan
df_aux = df_girasol.filter('anio = "'+str(peorAnio[1][0])+'"')
df_aux = df_aux[['provincia_nombre', 'superficie_sembrada_ha', 'superficie_cosechada_ha']]

mapeo_siembra =  df_aux.rdd.map(lambda x: (x['provincia_nombre'], x['superficie_sembrada_ha']))
resultado_siembra = mapeo_siembra.reduceByKey(lambda x,y: round(x+y,2) ).sortByKey().collect()

mapeo_cosecha =  df_aux.rdd.map(lambda x: (x['provincia_nombre'], x['superficie_cosechada_ha']))
resultado_cosecha = mapeo_cosecha.reduceByKey(lambda x,y: round(x+y,2) ).sortByKey().collect()

porcentajes = []

for i in range(len(resultado_siembra)):
    porcentajes.append((resultado_cosecha[i][1] * 100) / resultado_siembra[i][1])
    
df = pd.DataFrame(resultado_siembra, columns=["provincia", "siembra"])
df["porcentaje"] = porcentajes
df = df[['provincia', 'porcentaje']]

In [None]:
ax = df.plot.bar(x='provincia', y='porcentaje', rot=50,  figsize=(9,7))
ax.get_legend().remove()
ax.set_title('Porcentaje de cosecha en base a la siembra del mejor año')
ax.set_xlabel('Provincias')
ax.set_ylabel('Porcentaje')

In [None]:
#cerramos la sesion de spark
spark.sparkContext.stop()

## Análisis

In [None]:
# Recuperamos el dataset
df = pd.read_csv('girasol-serie-1969-2019.csv', encoding=("ISO-8859-1"))

In [None]:
df = df.drop(columns=['campania','provincia_id', 'departamento_nombre', 'departamento_id'])
df.head()

#### Estadística básica

In [None]:
print("Promedio de cada uno de los atributos que nos interesan")
print("-------------------------------------------------------")
print("Superficie sembrada:	", df.superficie_sembrada_ha.mean())
print("Superficie cosechada:	", df.superficie_cosechada_ha.mean())
print("Producción: 		", df.produccion_tm.mean())
print("Rendimiento: 		", df.rendimiento_kgxha.mean())

In [None]:
df2 = df.groupby(["provincia_nombre"])["rendimiento_kgxha"].mean()
df2

In [None]:
sns.displot(df, x="rendimiento_kgxha", bins=20)
plt.savefig('rendimiento_kgxha.png')

In [None]:
sns.stripplot(x="cultivo_nombre",y="rendimiento_kgxha",data=df)

In [None]:
sns.displot(df, x="produccion_tm", bins=10)

In [None]:
sns.stripplot(x="cultivo_nombre",y="produccion_tm",data=df)

In [None]:
print("Comparación mediana - media:")
print("---------------------")
print("Superficie sembrada:	", df.superficie_sembrada_ha.median(), "---", df.superficie_sembrada_ha.mean())
print("Superficie cosechada:	", df.superficie_cosechada_ha.median(), "---", df.superficie_cosechada_ha.mean())
print("Producción:		", df.produccion_tm.median(), "---", df.produccion_tm.mean())
print("Rendimiento:		", df.rendimiento_kgxha.median(), "---", df.rendimiento_kgxha.mean())

In [None]:
sns.boxplot(x="cultivo_nombre",y="rendimiento_kgxha",data=df)

In [None]:
sns.boxplot(x="cultivo_nombre",y="produccion_tm",data=df)

In [None]:
df_aux = df.drop(columns=['cultivo_nombre','anio', 'provincia_nombre'])
sns.pairplot(df_aux)

#### Análisis de Correlación

In [None]:
sns.heatmap(df.corr(),annot=True, cmap="coolwarm")

In [None]:
mask = (df['anio'] >= 2009)
filtered_df =df.loc [mask]
filtered_df.head()

In [None]:
fp = filtered_df.pivot_table(index="provincia_nombre",columns="anio",values="rendimiento_kgxha")
fp.head()

In [None]:
sns.heatmap(fp,linecolor="white",linewidths=2, cmap="coolwarm")

In [None]:
sns.lmplot(x="superficie_sembrada_ha",y="superficie_cosechada_ha",data=filtered_df)

In [None]:
sns.lmplot(x="superficie_cosechada_ha",y="produccion_tm",data=filtered_df)

In [None]:
sns.lmplot(x="superficie_cosechada_ha",y="rendimiento_kgxha",data=filtered_df)

#### Regresión Lineal

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
import warnings
with warnings.catch_warnings():
    warnings.simplefilter("ignore")

In [None]:
df.columns

##### Predecir Cosecha

In [None]:
# Features
X = df[["superficie_sembrada_ha"]]

# Intento predecir
y = df["superficie_cosechada_ha"]

In [None]:
# 30% para test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=101)

###### Por cada unidad que aumente el coeficiente, incrementa o decrementa el rendimiento

In [None]:
# Entrenamos el modelo
lm = LinearRegression()
lm.fit(X_train.values, y_train)
cdf = pd.DataFrame(lm.coef_, X.columns, columns=["Coeficiente"])
cdf

In [None]:
# Predicciones
predictions = lm.predict(X_test.values)
plt.scatter(y_test, predictions)
plt.xlabel("Siembra", size = 12)
plt.ylabel("Cosecha", size = 12)
sns.regplot(data=df,x=y_test,y=predictions,scatter=True,order=1)
plt.savefig('prediccion_cosecha.png')

In [None]:
# Predicción de cosecha para una siembra de 10.000 hectáreas
print(lm.predict([[25000]]))

#### Predecir Producción

In [None]:
# Features
X = df[["superficie_sembrada_ha", "superficie_cosechada_ha"]]

# Intento predecir
y = df["produccion_tm"]

In [None]:
# 30% para test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=101)

In [None]:
# Entrenamos el modelo
lm = LinearRegression()
lm.fit(X_train.values, y_train)
cdf = pd.DataFrame(lm.coef_, X.columns, columns=["Coeficientes"])
cdf

In [None]:
# Predicciones
predictions = lm.predict(X_test.values)
plt.scatter(y_test, predictions)
plt.xlabel("Producción", size = 12)
sns.regplot(data=df,x=y_test,y=predictions,scatter=True,order=1)

In [None]:
# Predicción de producción para una siembra de 5.000 hectáreas y cosecha de 4900 hectáreas
print(lm.predict([[25000, 23934]]))