# Python

`Python` como lenguaje tiene las siguientes características:

* Alto nivel
* Intepretado
* Orientado a objetos (pero en realidad multiparadigma)


## Ejemplo de programa

Para mostrar las facilidades de comprensión de los programas en `python` observe lo siguiente:

```python

for line in open(”file.txt”):
    for word in line.split():
        if word.endswith(”ing”):
            print word
```

Esto se puede leer línea por línea en inglés

## Funciones en python

En términos básicos una _función_ es una manera de empaquetar código para su posterior reuso.

In [None]:
def repetir(texto, num_veces):
    return texto*num_veces

In [None]:
monty = "Monty Python "
repetir(monty, 3)

In [None]:
repetir("Hola Puno ", 5)

También existen las funciones anónimas o funciones _lambda_

In [None]:
cubo = lambda x: x*x*x

In [None]:
cubo(3)

## Tipos de datos en python

Existen datos de tipo **númericos** (Enteros, flotantes, etc.)

In [None]:
variable_1 = 10

In [None]:
variable_1

Una **cadena** es una secuencia de caracteres.

In [None]:
variable_2 = "John Smith"
variable_2

Una **lista** es un conjunto ordenado de elementos

In [None]:
variable_3 = [10, "John Smith", ['another', 'list']]
variable_3

In [None]:
variable_3[2] = "Adolfo"
variable_3

Una **tupla** es una lista inmutable

In [None]:
variable_4 = (10, "John Smith")
variable_4

Un **diccionario** contiene pares llave-valor (KV)

In [None]:
variable_5 = {"name": "John Smith", "age": 45}
variable_5

In [None]:
variable_5['name']

## Ejemplo de uso de bibliotecas de sistema operativo

In [None]:
import os

In [None]:
os.listdir("data/data_berka/")

Se puede obtener lo mismo con comandos _mágicos_ de `Jupyter`

In [None]:
!ls data/data_berka/

# Apache Spark 

_A fast and general engine for large scale data processing _



- _Framework_ de cómputo general para _clusters_
- Ejecuta en `YARN`
  - Aunque también puede hacer _standalone_, o ejecutar sobre `EC2` o `Mesos`.
- Soporta varios lenguajes
  - `Python`, `Java`,  `Scala` y recientemente en `R`

<img src="Spark-2015-Vision.jpg"/>

## ¿Por qué Apache Spark?


* Algoritmos iterativos
* Exploración interactiva
* Plataforma unificada de análisis de datos (en gran escala)
    - _batch_, interactivo, _streaming_

## Historia de Spark

- `Mesos`, era un _framework_ distribuido creado como proyecto para una clase en UC Berkeley in 2009. 
- `Spark` fué creado para ver si `Mesos` funcionaba.
- Fué abierto a partir de 2010 

## Resilient Distributed Datasets (RDDs)

- Es una de las ideas principales de Spark.
- `RDDs` es una abstracción que representa una colleción de objetos de sólo lectura que está particionada a lo largo de varias máquinas.
- Sus ventajas:
  - Pueden ser reconstruidas a partir de su _lineage_. (Soportan fallos...)
  - Pueden ser accesadas vía operaciones en paralelo, parecidas a MapReduce.
  - Son _cached_ en memoria para su uso inmediato.
  - Fueron construidas para ser almacenadas de manera distribuida.
  - Contienen cualquier tipo de dato (ya sea de `Python`, `R`, `Java` o `Scala`) incluidos tipos definidos por el programador.

- Soportan dos tipos de operaciones
  - *Transformaciones*
  - *Acciones*.

- Las _transformaciones_ construyen un `RDD` nuevo a partir del anterior.
  - Cada transformación queda guardada por =Spark= en el /lineage graph/ un *DAG*.

- Las _acciones_ calculan un resultado basado en el `RDD`.

- La diferencia es que las `RDD` son computadas en forma _lazy_, sólo son ejecutadas hasta la acción.

- Si quieres usarlo una `RDD` varias veces debes de persistirla (con `persist()`).

## Flujo típico de trabajo

1. Crear un `RDD` a partir de datos externos.
2. Transformarlo a nuevos `RDDs`.
3. Persistir algunos `RDDs` para su uso posterior.
4. Lanzar acciones.

#### Ejemplo de flujo de trabajo

In [None]:
accounts = sc.textFile("data/data_berka/account.asc")

In [None]:
accounts.count()

In [None]:
accounts.first()

In [None]:
accounts.take(10)

In [None]:
not_poplatek = accounts.filter(lambda account: "POPLATEK" not in account)

In [None]:
not_poplatek.count()

In [None]:
not_poplatek.first()

In [None]:
def notPopLatek(account):
    return "POPLATEK" not in account

In [None]:
not_poplatek = accounts.filter(notPopLatek)

In [None]:
not_poplatek.count()

In [None]:
not_poplatek.first()

## Spark - RDD API

* [RDD API](http://spark.apache.org/docs/1.3.0/api/scala/index.html#org.apache.spark.rdd.RDD)

* From API docs: "immutable, partitioned collection of elements that can be operated on in parallel"

### Transformaciones

  - `map`
    - Usa una función y la aplica a cada elemento del `RDD`, el resultado se guarda en un nuevo `RDD`.
  - `filter`
    - Usa una función y devuelve sólo los elementos que pasan la función (que devuelven verdadero) en el nuevo `RDD`.
  - `flatMap`
    - Como el `map` pero regresa un iterador por cada elemento
      - Por ejemplo una función que divide una cadena.
  - `distinct`, `sample`
  - `union`, `intersection`, `substract`, `cartesian`


In [None]:
numeros = sc.parallelize([1,2,3,4,5])

In [None]:
numeros.count()

In [None]:
cuadrados = numeros.map(lambda x: x*x).collect()

In [None]:
for cuadrado in cuadrados:
    print("%i " % (cuadrado))

In [None]:
pares = numeros.filter(lambda x: x%2 == 0).collect()

In [None]:
for par in pares:
    print("%i " % (par))

In [None]:
frases = sc.parallelize(["hola estudiantes", "xvii congreso estudiantil"])
palabras = frases.flatMap(lambda frase: frase.split(" "))
palabras.first()

### Acciones

- `reduce`
  - Opera en dos elementos del mismo tipo del `RDD` y regresa un elemento del mismo tipo.
- `aggregate`
  - Nos permite implementar acumuladores.
- `collect`
  - Regresa el `RDD` completo.
- =`ake`
  - Regresa un número =n= de elementos del  =RDD=.
- `count`, `countByValue`, `top`, `foreach`.


In [None]:
suma = numeros.reduce(lambda x, y: x + y)
suma

In [None]:
sumaConteo = numeros.aggregate((0, 0), # Valor inicial
                               (lambda acc, value: (acc[0] + value, acc[1] + 1)), # Combinamos el RDD con el acumulador
                               (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])) # Juntamos ambos acumuladores
                              ) 
                            

In [None]:
sumaConteo[0]

In [None]:
sumaConteo[1]

In [None]:
sumaConteo[0]/float(sumaConteo[1])

In [None]:
numeros.take(2)

In [None]:
numeros.top(3)

In [None]:
numeros.takeSample(withReplacement=True, num=2, seed=1334 )

## Paired RDD

In [None]:
accounts_csv = accounts.\
            filter(lambda line: "account_id" not in line).\
            map(lambda x: x.split(";"))
accounts_csv.take(5)

In [None]:
kv_accounts = accounts_csv.map(lambda x: (x[1], x)) # x[1] contiene el district_id
kv_accounts.take(5)

In [None]:
kv_accounts.keys().first()

In [None]:
kv_accounts.values().first()

In [None]:
kv_accounts.sortByKey().take(10)

In [None]:
kv_districts = sc.textFile("data/data_berka/district.asc").\
                filter(lambda x: "A1" not in x).\
                map(lambda x: x.split(";")).\
                map(lambda x: (x[0], x))
kv_districts.take(5)

In [None]:
accounts_district = kv_accounts.join(kv_districts)
accounts_district.take(5)

# Leer y escribir

## Archivos de texto

Un registro, una línea

In [None]:
clients = sc.textFile("data/data_berka/client.asc")

In [None]:
clients.count()

In [None]:
clients.first()

In [None]:
sampled_clients = clients.sample(withReplacement=False, fraction=0.01, seed=1334)

In [None]:
! rm -R output/sampled_clients/

In [None]:
sampled_clients.saveAsTextFile("output/sampled_clients")

## Archivos CSV

In [None]:
accounts_df = sqlContext.read.format('com.databricks.spark.csv').\
                options(header='true', delimiter=';').\
                load("data/data_berka/account.asc")

In [None]:
! rm -R output/accounts_csv/

In [None]:
accounts_df.select("account_id", "district_id", "frequency").\
            write.format("com.databricks.spark.csv").\
            save("output/accounts_csv")

# SparkSQL

`SparkSQL` además de permitirnos interactuar usando `SQL` con los `RDDs` (en realidad `HQL` _Hive query language_, ver documentación [aquí](https://cwiki.apache.org/confluence/display/Hive/LanguageManual)), agregar una capa de abstracción al `RDD` y lo convierte en un `DataFrame` análogo al usado en `R` y `Python`.

In [None]:
accounts_df.registerTempTable('accounts')

In [None]:
sqlCtx.sql('show tables').show()

In [None]:
sqlCtx.sql('select * from accounts limit 5').show()

## SparkSQL y archivos JSON

En `sparkSQL` es más fácil tratar con archivos `json`

In [None]:
! rm -R data/world_bank*

In [None]:
! wget http://jsonstudio.com/wp-content/uploads/2014/02/world_bank.zip -P data/

In [None]:
! unzip data/world_bank.zip -d data/world_bank

In [None]:
world_bank = sqlCtx.read.json("data/world_bank/world_bank.json")

Automáticamente detecta el _esquema_ de la fuente de datos

In [None]:
world_bank.printSchema()

In [None]:
world_bank.registerTempTable("world_bank_projects")

In [None]:
sqlCtx.sql('show tables').show()

In [None]:
sqlCtx.sql('select countryshortname, project_name, totalamt, totalcommamt from world_bank_projects order by countryshortname').show()

In [None]:
projects_by_country = sqlCtx.sql('select countryshortname as country, count(project_name) as num_projects, sum(totalamt) as total_amount from world_bank_projects group by countryshortname order by countryshortname')
projects_by_country.show()

## SparkSQL y Pandas

Es posible usar `Pandas` para hacer análisis, pero hay que tomar en cuenta que esto manda _todo_ el `dataset` a un sólo nodo (más adelante veremos como usar `MLib` para hacerlo de manera distribuida)

In [None]:
import pandas as pd

In [None]:
projects_by_country_pd = projects_by_country.toPandas()

In [None]:
projects_by_country_pd.columns

In [None]:
projects_by_country_pd.describe()

In [None]:
projects_by_country_pd['country']

In [None]:
projects_by_country_pd=projects_by_country_pd.set_index(['country'])

In [None]:
projects_by_country_pd.num_projects

In [None]:
projects_by_country_pd.head()

In [None]:
projects_by_country_pd.tail()

In [None]:
projects_by_country_pd[10:20]

In [None]:
projects_by_country_pd.ix['Peru']

In [None]:
%pylab inline
projects_by_country_pd['num_projects'][:10].plot(kind='barh', rot=0, )

## Ejemplo básico: WordCount

Descargaremos del [Proyecto Gutenberg](https://www.gutenberg.org) el libro [Beowulf](https://www.gutenberg.org/ebooks/16328) de J. Lesslie Hall.

In [None]:
! wget https://www.gutenberg.org/ebooks/16328.txt.utf-8 -P data/books

In [None]:
def tokenize(texto):
    return texto.split()

Veámos que hace esta función

In [None]:
tokenize("En el bosque, de la China, la chinita se perdió")

In [None]:
beowulf = sc.textFile("data/books/16328.txt.utf-8") # Creamos el RDD desde archivo

In [None]:
wordcount = beowulf.map(lambda line: line.lower()).\
                        flatMap(tokenize).\
                        map(lambda x: (x,1)).\
                        reduceByKey(lambda x, y: x + y).\
                        map(lambda x: (x[1], x[0])).\
                        sortByKey(ascending=False)              

In [None]:
wordcount.take(50)

Como podemos ver, hay muchas palabras que no dicen nada sobre la obra, para hacer _minería de textos_ tendríamos que limpiarlas...

Veámoslo en `pandas`

In [None]:
words_counted = pd.DataFrame(wordcount.collect(), columns=["freq", "word"])
words_counted[:10]

In [None]:
words_counted['freq'][:10]

In [None]:
%pylab inline
fig, ax = plt.subplots(nrows=1, ncols=1, figsize=(16,12));
words_counted['freq'][:100].plot(kind="bar")
ax.set_xticklabels(words_counted['word'][:100]);