Docker
==

Docker es una herramienta que ya lleva varios años tomando popularidad en los ambientes de desarrollo de software. Este popularidad se va extendiendo a otras areas más allá del desarrollo de software. ¿Por qué?

1. Motivación al uso de docker
2. Dockerfile
3. Dockerfile en la ciencia de datos


### Bibliografía

1. [Docker for Data Science: Building Scalable and Extensible Data Infrastructure Around the Jupyter Notebook Server](https://libgen.is/book/index.php?md5=7521B3D87DF447AB03F259271A5C2149)
2. [Learning PySpark](https://libgen.is/book/index.php?md5=8A78A93D5EB86F511A74910DC09E3DD6)

Docker + Pyspark
==

Si usted no sabe pyspark no sabe big data, o por lo menos, big data actual.

![](https://echanclarityinsights.github.io/images/2018-12-23/spark-meme.jpg)

In [1]:
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.context import SQLContext
from pyspark.sql.session import SparkSession 

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

In [4]:
spark = SparkSession.builder.master("local").getOrCreate()

Tareas guiadas:
0. Construir unas credenciales para el cliente en Amazon Web Services
1. Leer Iris con pyspark
2. Leer Iris con pyspark y guardarlo como parquet
3. Guardarlo en un bucket s3 particionado por tipo de flor
4. Encontrar mediante sql las dos flores del mismo tipo más "diferentes" mediante distancia Euclidiana

**Ronald Fisher** tiene como idea determinar cuales pares de flores del tipo iris son los más distintos posibles pero que son del mismo tipo, diseñe un programa en pyspark que de respuesta ¿cuáles son los pares más distintos posibles siendo que son del mismo tipo de flor?

In [37]:
spark.read.csv("iris.csv")

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string]

In [38]:
spark.read.options(header=True).csv("iris.csv")

DataFrame[sepal.length: string, sepal.width: string, petal.length: string, petal.width: string, variety: string]

In [39]:
iris = spark.read.options(header=True, inferSchema=True).csv("iris.csv")

In [40]:
from pyspark.sql.functions import monotonically_increasing_id

In [41]:
iris = iris.withColumn("id", monotonically_increasing_id())

In [42]:
iris.show()

+------------+-----------+------------+-----------+-------+---+
|sepal.length|sepal.width|petal.length|petal.width|variety| id|
+------------+-----------+------------+-----------+-------+---+
|         5.1|        3.5|         1.4|        0.2| Setosa|  0|
|         4.9|        3.0|         1.4|        0.2| Setosa|  1|
|         4.7|        3.2|         1.3|        0.2| Setosa|  2|
|         4.6|        3.1|         1.5|        0.2| Setosa|  3|
|         5.0|        3.6|         1.4|        0.2| Setosa|  4|
|         5.4|        3.9|         1.7|        0.4| Setosa|  5|
|         4.6|        3.4|         1.4|        0.3| Setosa|  6|
|         5.0|        3.4|         1.5|        0.2| Setosa|  7|
|         4.4|        2.9|         1.4|        0.2| Setosa|  8|
|         4.9|        3.1|         1.5|        0.1| Setosa|  9|
|         5.4|        3.7|         1.5|        0.2| Setosa| 10|
|         4.8|        3.4|         1.6|        0.2| Setosa| 11|
|         4.8|        3.0|         1.4| 

In [44]:
iris.write.mode("overwrite").parquet("./iris")

In [45]:
iris2 = iris

iris_join = iris.join(iris2, iris.variety == iris2.variety)

In [46]:
iris_join.columns

['sepal.length',
 'sepal.width',
 'petal.length',
 'petal.width',
 'variety',
 'id',
 'sepal.length',
 'sepal.width',
 'petal.length',
 'petal.width',
 'variety',
 'id']

In [47]:
iris_join = iris_join.toDF(*["sepal_length", 
                     "sepal_width", 
                     "petal_length", 
                     "petal_width", 
                     "variety", 
                     "id",
                     "sepal_length2", 
                     "sepal_width2", 
                     "petal_length2", 
                     "petal_width2", 
                     "variety2",
                     "id2"])

In [49]:
iris_join = iris_join.withColumn("distancia", (iris_join.sepal_length - iris_join.sepal_length2)**2  + (iris_join.sepal_width - iris_join.sepal_width2)**2)

In [60]:
from pyspark.sql.functions import max

resume = iris_join.groupBy("variety").agg(max("distancia").alias("distancia"))

In [66]:
(iris_join
     .join(resume,iris_join.distancia ==  resume.distancia, "inner")
     .select(iris_join.variety2, iris_join.id,iris_join.id2,iris_join.distancia)).show()

+----------+---+---+-----------------+
|  variety2| id|id2|        distancia|
+----------+---+---+-----------------+
|    Setosa| 15| 41|5.850000000000002|
|    Setosa| 41| 15|5.850000000000002|
|Versicolor| 50| 60|             5.44|
|Versicolor| 60| 50|             5.44|
| Virginica|106|131|            10.69|
| Virginica|131|106|            10.69|
+----------+---+---+-----------------+



In [11]:
"""
spark.conf.set("fs.s3n.awsAccessKeyId", aws_access_key_id)
spark.conf.set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)
spark.conf.set("fs.s3.awsAccessKeyId", aws_access_key_id)
spark.conf.set("fs.s3.awsSecretAccessKey", aws_secret_access_key)
"""

'\nspark.conf.set("fs.s3n.awsAccessKeyId", aws_access_key_id)\nspark.conf.set("fs.s3n.awsSecretAccessKey", aws_secret_access_key)\nspark.conf.set("fs.s3.awsAccessKeyId", aws_access_key_id)\nspark.conf.set("fs.s3.awsSecretAccessKey", aws_secret_access_key)\n'