#### Este bloque es opcional y es para el caso cuando spark no puede encontrar el home de JAVA (donde esta instalado)

In [None]:
import os
# variable de entorno JAVA_HOME, en este caso para windows (en windows la diagonal es \\ en linux y mac es /)
os.environ["JAVA_HOME"] = "C:\\Program Files\\Java\\jdk1.8.0_271"

#### Algunos módulos importantes
<code>pyspark.sql.functions</code> Contiene funciones internas de sql, pero pueden llamarse fuera de una string query. Más información en el siguiente 
<a href="https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions" target="_blank">link</a>.

In [None]:
import pyspark
import pyspark.sql.functions as F 

#### Inicializar el SparkContex
La config es opcional (puede ser <b>None</b>) Este bloque solo puede ejecutarse una vez ya que solo puede existir un contexto por aplicacion (se puede reiniciar el kernel para destruir el context)

In [None]:
# crear el contexto de spark

conf = pyspark.SparkConf()
conf.set('spark.executor.cores', '4')
conf.set('spark.cores.max', '4')
conf.set('spark.executor.memory', '4g')

sc = pyspark.SparkContext(master="local",appName="MyApp",conf=conf)

#### Cargar el archivo. No olvidar el tipo y ruta del archivo, así como el tipo de diagonal en la ruta, que es \\\\ para windows / para otros OS.

In [None]:
# crear sesion SQL y cargar nuestros datos
sqlContext= pyspark.sql.SparkSession(sc)
input_data = sqlContext.read.csv("D:\\CovidCIMAT\\Input\\ZM_2015_pob.csv",header=True, inferSchema=True)
input_data.printSchema()


#### Esta linea es más que nada para "nombrar" la tabla que se cargo de archivo, para hacer referencia a ella más facil en queries

In [None]:
input_data.createOrReplaceTempView("zonas")

#### Query SQL, el resultado es un DataFrame
se puede imprimir una muestra de los datos con <b>show</b> que tiene dos argumentos opcionales:
- El número de renglones a imprimir (default 20)
- Mostrar la información compactada (default True)

In [None]:
result = sqlContext.sql("SELECT CVE_ZM, NOM_ZM, POB_2015 from zonas")
result.show()

#### Seleccionando columnas desde el dataframe con select
muestro 3 formas de hacerlo (solo quita el simbolo # que es comentario):
- con el nombre de la columna
- con la funcion SQL COL
- seleccionando por columna en DataFrame

In [None]:
result = input_data.select("CVE_ZM","NOM_ZM","POB_2015")
#result = input_data.select(F.col("CVE_ZM"),F.col("NOM_ZM"),F.col("POB_2015"))
#result = input_data.select(input_data["CVE_ZM"],input_data["NOM_ZM"],input_data["POB_2015"])
result.show()

#### Otro ejemplo de consulta SQL

In [None]:
result = sqlContext.sql("SELECT CVE_ZM, FIRST(NOM_ZM) as NOM_ZM, SUM(POB_2015) as POB from zonas group by CVE_ZM order by CVE_ZM")
result.show()

#### Misma consulta, pero usando directamente el DataFrame con funciones SQL
Notese la función <b>alias</b>

In [None]:
result = input_data.select(F.col("CVE_ZM"),F.col("NOM_ZM") ,F.col("POB_2015")
              ).groupBy('CVE_ZM').agg(F.first("NOM_ZM").alias("NOM_ZM"),F.sum("POB_2015").alias("POB")
             ).orderBy('CVE_ZM')
result.show()

#### withColumn
Agrega o reemplaza una columna con los datos proporcionados. Requiere 2 parámetros
- el nombre de la columna nueva (de existir esa columna, los datos se sobreescriben, de lo contrario se agrega una nueva columna)
- la columna de donde se copiaran los datos

En este ejemplo estoy sustituyendo la columna <b>NOM_ZM</b> con los datos de la misma, pero en mayúsculas (función SQL <b>upper</b>)

In [None]:
result=input_data.select("CVE_ZM","NOM_ZM","POB_2015").withColumn("NOM_ZM",F.upper("NOM_ZM"))
result.show()

#### Copiar los datos que hay en el Cluster hacia el Host
Regresa los datos en forma de una lista de renglones <b>Row</b>

In [None]:
result.collect()

#### Otra forma de recuperar datos, pero usando RDD y MAP

In [None]:
result.rdd.map(lambda x:(x.CVE_ZM,x.NOM_ZM,x.POB_2015)).collect()