# Intro to Spark
Ejemplos prácticos
---

Crear una sesión de Spark

In [None]:
### Con SparkSession
from pyspark.sql import SparkSession

# crear sesión

spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("local[*]") \
        .getOrCreate()

In [None]:
## parar la sesión
spark.stop()

In [None]:
## Con SparkContext y SQLContext
from pyspark import SparkContext
from pyspark import SQLContext

## crear el contexto
sc = SparkContext("local[*]", "pyspark_df")
ss = SQLContext(sc)


In [None]:
### parar el contexto
sc.stop()

---

In [None]:
### Crear un Dataframe a partir de un csv
dfs_flights = spark.read.option("inferSchema", "true").option("header", "true").csv('data/2015-summary.csv')

In [None]:
### Inspeccionar el Dataframe
dfs_flights.printSchema()

In [None]:
## Renombrar la cplumna count

dfs_flights = dfs_flights.withColumnRenamed('count', 'NumFlights')

In [None]:
dfs_flights.printSchema()

In [None]:
### Ordenar según la columna count
dfs_fligths_sorted = dfs_flights.sort("NumFlights", ascending=False) # no tarda nada porque es una transformación

In [None]:
dfs_fligths_sorted.show(10) ## al hacerle el show realiza el sort y nos devuelve los 10 primeros

In [None]:
### group by
count_per_dest = dfs_flights.groupBy("DEST_COUNTRY_NAME").sum()

In [None]:
### Enseña el dataframe count_per_dest
count_per_dest.show()

In [None]:
## Renombra la columna sum(NumFlights)
count_per_dest = count_per_dest.withColumnRenamed('sum(NumFlights)', 'total_flights')
count_per_dest.show()

In [None]:
### Ordena por número de vuelos
most_flights = count_per_dest.sort('total_flights', ascending=False)

In [None]:
### Top 5
most_flights.show(5)

### Querying the data with SQL

In [None]:
# Create temp table

df_flights.createOrReplaceTempView("flights_2015")

In [None]:
query = """
SELECT *
FROM flights_2015
ORDER BY count DESC
LIMIT 10
"""

In [None]:
query_result = spark.sql(query)

In [None]:
type(query_result)

In [None]:
query_result.show()

In [None]:
# Calcula el total de vuelos en función del destino y saca el destino más frecuente

## Funcionalidad dataframes

Muchas veces nos encontraremos con que queremos utilizar algua funcion integrada en los dataframes, cómo en pandas.

Para poder acceder a estas funciones, tendremos que importarlas de la librería

In [None]:
from pyspark.sql.functions import max

In [None]:
spark.sql("SELECT max(count) from flights_2015").take(1)

In [None]:
dfs_flights.select(max("NumFlights")).take(1)

### Intefración con pandas

In [None]:
import pandas as pd

In [None]:
df = dfs_flights.toPandas()

In [None]:
df.head()

In [None]:
df = dfs_flights.groupBy("DEST_COUNTRY_NAME").sum("NumFlights")\
           .withColumnRenamed("sum(NumFlights)", "destination_total")\
           .sort(desc("destination_total"))\
           .limit(10)\
           .toPandas()

In [None]:
%matplotlib inline

In [None]:
df.set_index('DEST_COUNTRY_NAME').plot(kind='barh')

In [None]:
## from pandas to SparkDataframe

df_test = spark.createDataFrame(df)

In [None]:
spark.stop()

## Ejercicio
---

Haz un análisis exploratorio del fichero 'data/201508_trip_data.csv' usando spark y plotea los resultados usando las funcionalidades de python

In [None]:
## importa lo necesario para crear el entorno de spark

### Con SparkSession
from pyspark.sql import SparkSession

# crear sesión

spark = SparkSession.builder.appName("Bikes").config("local[*]").getOrCreate()

In [None]:
## carga el fichero 'data/201508_trip_data.csv'
path = 'data/201508_trip_data.csv'

dfs_bikes = spark.read.option('inferSchema', True).option('header', True).csv(path)

In [None]:
# Imprime el esquema y las columnas del dfs_bikes

dfs_bikes.printSchema()

In [None]:
## Haz una descripción de la columnas Duration del dfs_bikes
## En que unidades está la duración ?
dfs_bikes.select('Duration').describe().show()

In [None]:
## Cual son las 5 estaciones más transitada?

most_frequent = dfs_bikes.select('Zip Code', 'Duration').groupby('Zip Code').count()\
                .withColumnRenamed('count', 'num_trips')\
                .sort('num_trips', ascending=False).limit(5)

In [None]:
## Repite el ejercicio anterior usando SQL

## Cambia el nombre a la columna para que no tenga espacios
dfs_bikes = dfs_bikes.withColumnRenamed('Zip Code', 'zipcode')

## Crea una tabla temporal
dfs_bikes.createOrReplaceTempView('bike_trips')

## Escribe una query sql que realice la tarea de antes
query = """

SELECT zipcode, count(1) as num_trips
FROM bike_trips
GROUP BY zipcode
ORDER BY num_trips DESC
LIMIT 5
"""

most_frequent_sql = spark.sql(query)

In [None]:
### Transform it to pandas and plot the result
