# Data processing course assignments
---

In [1]:
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import asc, lit, udf, sum
from pyspark.sql.types import *
import re

In [2]:
conf = SparkConf().setAppName('Assignment').setMaster('local')
spark = SparkContext(conf=conf)

## Ejercicio 1.
#### Leer el archivo data/containers.csv y contar el número de líneas.
---

In [3]:
path = "/home/jovyan/work/data_processing_course-master/assignments/data/containers.csv"
csvdata = spark.textFile(path) #.option("header", "false").load(path)

In [4]:
csvdata.take(2)

['ship_imo;ship_name;country;departure;container_id;container_type;container_group;net_weight;gross_weight;owner;declared;contact;customs_ok',
 'AMC1861710;Jayden;BD;201602183;FCUK1755843;4960;28VH;44804866.62;2240243.33;Streich-Wilkinson;Music, Tools, Automotive & Health;octavia@stammbednar.name;true']

In [5]:
csvdata.count()

614

## Ejercicio 2.
#### Leer el archivo data/containers.csv y filtrar aquellos contenedores cuyo ship_imo es DEJ1128330 y el grupo del contenedor es 22P1. Guardar los resultados en un archivo de texto en resultados/resutado_2.
---

In [6]:
print(type(csvdata))
datardd = csvdata
print(type(datardd))

datasplit = datardd.map(lambda row: row.split(";"))

datasplit.take(2)

<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.RDD'>


[['ship_imo',
  'ship_name',
  'country',
  'departure',
  'container_id',
  'container_type',
  'container_group',
  'net_weight',
  'gross_weight',
  'owner',
  'declared',
  'contact',
  'customs_ok'],
 ['AMC1861710',
  'Jayden',
  'BD',
  '201602183',
  'FCUK1755843',
  '4960',
  '28VH',
  '44804866.62',
  '2240243.33',
  'Streich-Wilkinson',
  'Music, Tools, Automotive & Health',
  'octavia@stammbednar.name',
  'true']]

In [7]:
datasplit.map(lambda row: row[0]).take(10)

['ship_imo',
 'AMC1861710',
 'POG1615575',
 'SQH1155999',
 'JCI1797526',
 'MBV1836745',
 'GYR1192020',
 'GLV1922612',
 'NLH1771681',
 'FUS1202266']

In [8]:
datasplit.filter(lambda row: row[6] == '22P1' and row[0] == 'DEJ1128330').count()

2

In [9]:
datasplit.filter(lambda row: row[6] == '22P1' and row[0] == 'DEJ1128330').take(2)

[['DEJ1128330',
  'Tiara',
  'GP',
  '2016021818',
  'GYFD1228113',
  '20PF',
  '22P1',
  '51503716.88',
  '5150371.69',
  'Armstrong-Goldner',
  'Automotive, Sports, Games & Clothing',
  'caria@cronin.io',
  'true'],
 ['DEJ1128330',
  'Tiara',
  'GP',
  '2016021818',
  'MBPF1909627',
  '24H2',
  '22P1',
  '37266600.88',
  '1863330.04',
  'Lehner-Hamill',
  'Jewelery, Automotive, Games & Electronics',
  'phoebe@volkman.net',
  'true']]

Guardando los datos como archivo de texto

In [10]:
x1 = datasplit.filter(lambda row: row[6] == '22P1' and row[0] == 'DEJ1128330')
x1.saveAsTextFile("/home/jovyan/work/data_processing_course-master/assignments/data/111.csv")

## Ejercicio 3.
#### Leer el archivo data/containers.csv y convertir a formato Parquet. Recuerda que puedes hacer uso de la funcion parse_container en helpers.py tal y como vimos en clase. Guarda los resultados en resultados/resultado_3.
---

In [10]:
path = "/home/jovyan/work/data_processing_course-master/assignments/data/containers.csv" #Usamos el mismo path creado previamente

In [11]:
sqlContext = SQLContext(spark)

In [12]:
df = sqlContext.read.format('com.databricks.spark.csv').options(header = "true", sep = ';').load(path)

In [13]:
df.first()

Row(ship_imo='AMC1861710', ship_name='Jayden', country='BD', departure='201602183', container_id='FCUK1755843', container_type='4960', container_group='28VH', net_weight='44804866.62', gross_weight='2240243.33', owner='Streich-Wilkinson', declared='Music, Tools, Automotive & Health', contact='octavia@stammbednar.name', customs_ok='true')

In [16]:
df.write.parquet("ejercicios33.parquet")

## Ejercicio 4.
#### Lee el archivo de Parquet guardado en el ejercicio 3 y filtra los barcos que tienen al menos un contenedor donde la columna customs_ok es igual a false. Extrae una lista con los identificadores de barco, ship_imo, sin duplicados y ordenados alfabéticamente, en formato json.
---

In [14]:
data4 = sqlContext.read.parquet("ejercicios3.parquet")
data4.registerTempTable("parquetFile")

In [15]:
sqlContext.sql("SELECT DISTINCT ship_imo FROM parquetFile WHERE customs_ok = 'false'").show()

+----------+
|  ship_imo|
+----------+
|KSP1096387|
|GYR1192020|
|JET1053895|
|SQH1155999|
|NLH1771681|
|JCI1797526|
|GEU1548633|
|AEY1108363|
|IWE1254579|
|AMC1861710|
|POG1615575|
|MBV1836745|
|GLV1922612|
|YZX1455509|
|TCU1641123|
|JMP1637582|
|DEJ1128330|
|RYP1117603|
|FUS1202266|
|NCZ1777367|
+----------+



In [16]:
data4 = sqlContext.sql("SELECT DISTINCT ship_imo FROM parquetFile WHERE customs_ok = 'false'")

In [17]:
sqlContext.sql("SELECT DISTINCT ship_imo FROM parquetFile WHERE customs_ok = 'false'").count()

20

In [18]:
data4.sort(asc("ship_imo")).collect()

[Row(ship_imo='AEY1108363'),
 Row(ship_imo='AMC1861710'),
 Row(ship_imo='DEJ1128330'),
 Row(ship_imo='FUS1202266'),
 Row(ship_imo='GEU1548633'),
 Row(ship_imo='GLV1922612'),
 Row(ship_imo='GYR1192020'),
 Row(ship_imo='IWE1254579'),
 Row(ship_imo='JCI1797526'),
 Row(ship_imo='JET1053895'),
 Row(ship_imo='JMP1637582'),
 Row(ship_imo='KSP1096387'),
 Row(ship_imo='MBV1836745'),
 Row(ship_imo='NCZ1777367'),
 Row(ship_imo='NLH1771681'),
 Row(ship_imo='POG1615575'),
 Row(ship_imo='RYP1117603'),
 Row(ship_imo='SQH1155999'),
 Row(ship_imo='TCU1641123'),
 Row(ship_imo='YZX1455509')]

In [19]:
data4.sort(asc("ship_imo")).toJSON("data_processing_course-master/assignments/resultados/resultado_4")

MapPartitionsRDD[61] at toJavaRDD at NativeMethodAccessorImpl.java:0

## Ejercicio 5.
#### Crea una UDF para validar el código de identificación del contenedor container_id. Para simplificar la validación, daremos como válidos aquellos códigos compuestos de 3 letras para el propietario, 1 letra para la categoría, 6 números y 1 dígito de control. Devuelve un DataFrame con los campos: ship_imo, container_id, propietario, categoria, numero_serie y digito_control.
---

In [20]:
sqlContext.sql("SELECT DISTINCT container_id FROM parquetFile ").show()

+------------+
|container_id|
+------------+
| DXTQ1407119|
| VXNB1938296|
| OVMU1118217|
| EAQO1539643|
| SGQH1799946|
| SWXT1708984|
| EDBR1562470|
| FCMB1487245|
| UBRI1681197|
| LFTG1322014|
| RAUX1713695|
| QKOJ1756061|
| JVFH1614514|
| KETX1362337|
| KUOG1927848|
| GSVC1467358|
| JXRI1226202|
| QZKL1853985|
| HDGM1122708|
| ZBJH1066313|
+------------+
only showing top 20 rows



Pruebas con el módulo de python 're'

In [21]:
test5 = sqlContext.sql("SELECT DISTINCT container_id FROM parquetFile ").take(2)
type(test5)
type(test5[0])
print(str(test5))

[Row(container_id='DXTQ1407119'), Row(container_id='VXNB1938296')]


In [22]:
containeridre = re.search(r'\w\w\w\w\d\d\d\d\d\d\d', str(test5[0]))

In [23]:
if containeridre:
    print('ez pz')
else:
    pass

ez pz


Función para validar contenedores

In [24]:
def validateID(containerID):
    containeridre = re.search(r'\w\w\w\w\d\d\d\d\d\d\d', str(containerID))
    #nonvalid = []
    if containeridre:
        return("Container ID valid")
    else:
        #nonvalid.append(str(containerID))
        return("----- Container ID not valid: " + str(containerID) + '-----')

def propietario(containerID):
    aux = list(str(containerID))
    return str(aux[18]+aux[19]+aux[20])

def categoria(containerID):
    aux = list(str(containerID))
    return str(aux[21])

def numeroserie(containerID):
    aux = list(str(containerID))
    return str(aux[22]+aux[23]+aux[24]+aux[25]+aux[26]+aux[27])

def digitocontrol(containerID):
    aux = list(str(containerID))
    return str(aux[28])

propietarioudf = udf(propietario, StringType())
categoriaudf = udf(categoria, StringType())
numeroserieudf = udf(numeroserie, StringType())
digitocontroludf = udf(digitocontrol, StringType())

In [25]:
data5 = sqlContext.sql("SELECT DISTINCT container_id FROM parquetFile ")

In [26]:
data5col = data5.select("container_id")

In [27]:
data5col.rdd.take(2)

[Row(container_id='DXTQ1407119'), Row(container_id='VXNB1938296')]

In [28]:
validateID('DXTQ1407119')

'Container ID valid'

In [29]:
data5.rdd.map(lambda row: validateID(str(row))).collect()

['Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container ID valid',
 'Container

Contenedores que no tienen un código válido

In [30]:
data5.rdd.map(lambda row: validateID(str(row))).filter(lambda x: str(x) != 'Container ID valid').collect()

['----- Container ID not valid: Row(container_id=None)-----',
 "----- Container ID not valid: Row(container_id='JMYG190Z978')-----",
 "----- Container ID not valid: Row(container_id='GJFL14A2798')-----",
 "----- Container ID not valid: Row(container_id='CTVU1506A832')-----",
 "----- Container ID not valid: Row(container_id='DUKF166276')-----"]

In [31]:
data5.take(2)

[Row(container_id='DXTQ1407119'), Row(container_id='VXNB1938296')]

Creando un nuevo dataframe con las columnas deseadas y eliminando los contenedores no válidos

In [32]:
sqlContext.sql("SELECT DISTINCT ship_imo, container_id, owner, container_group FROM parquetFile ").show()
data5 = sqlContext.sql("SELECT DISTINCT ship_imo, container_id, owner, container_group FROM parquetFile ")

+----------+------------+--------------------+---------------+
|  ship_imo|container_id|               owner|container_group|
+----------+------------+--------------------+---------------+
|POG1615575| CDGP1673672|            Haag LLC|           22VH|
|YZX1455509| RVNZ1574043|         Kuhic Group|           28VH|
|NCZ1777367| MCLQ1045130|     Casper and Sons|           28VH|
|FUS1202266| JNDM1523352|Reinger, VonRuede...|           28VH|
|POG1615575| DLCR1100460|Hegmann, Hintz an...|           42T0|
|NCZ1777367| HXNW1061510|       Sipes-Farrell|           42T0|
|GEU1548633| CHTX1389862|      Homenick-Haley|           22VH|
|MBV1836745| YBZA1370157|Weber, Senger and...|           42G0|
|JCI1797526| FMHW1096477|        Grimes Group|           28VH|
|GLV1922612| PJGZ1036450|Jerde, Collier an...|           28VH|
|YZX1455509|IJWDR1216916|Mante, Cruickshan...|           28VH|
|SQH1155999| DENF1086740|Kulas, Corkery an...|           42VH|
|FUS1202266| NSIT1499568|     Dietrich-Legros|         

In [33]:
data5 = data5.withColumn('categoria', lit(0))
data5 = data5.withColumn('numero_serie', lit(0))
data5 = data5.withColumn('digitocontrol', lit(0))
data5 = data5.withColumn('propietario', lit(0))
data5.show()

+----------+------------+--------------------+---------------+---------+------------+-------------+-----------+
|  ship_imo|container_id|               owner|container_group|categoria|numero_serie|digitocontrol|propietario|
+----------+------------+--------------------+---------------+---------+------------+-------------+-----------+
|POG1615575| CDGP1673672|            Haag LLC|           22VH|        0|           0|            0|          0|
|YZX1455509| RVNZ1574043|         Kuhic Group|           28VH|        0|           0|            0|          0|
|NCZ1777367| MCLQ1045130|     Casper and Sons|           28VH|        0|           0|            0|          0|
|FUS1202266| JNDM1523352|Reinger, VonRuede...|           28VH|        0|           0|            0|          0|
|POG1615575| DLCR1100460|Hegmann, Hintz an...|           42T0|        0|           0|            0|          0|
|NCZ1777367| HXNW1061510|       Sipes-Farrell|           42T0|        0|           0|            0|     

Comprobamos los resultados de las funciones previamente creadas

In [34]:
data5.select('container_id').rdd.map(lambda row: propietario(row)).take(1)

['CDG']

In [35]:
data5.select('container_id').rdd.map(lambda row: numeroserie(row)).take(1)

['167367']

In [36]:
data5.select('container_id').rdd.map(lambda row: categoria(row)).take(1)

['P']

In [37]:
data5.select('container_id').rdd.map(lambda row: digitocontrol(row)).take(1)

['2']

Aplicamos las udf creadas y formamos el dataframe

In [38]:
data5 = data5.withColumn('propietarios', propietarioudf(data5.container_id))
data5 = data5.withColumn('numero_serie', numeroserieudf(data5.container_id))
data5 = data5.withColumn('categoria', categoriaudf(data5.container_id))
data5 = data5.withColumn('digitocontrol', digitocontroludf(data5.container_id))

## Ejercicio 6. 
#### Extrae una lista con peso total de cada barco, net_weight, sumando cada contenedor y agrupado por los campos ship_imo y container_group. Devuelve un DataFrame con la siguiente estructura: ship_imo, ship_name, container, total_net_weight.
---

In [39]:
data6 = df

In [40]:
data6.take(2)

[Row(ship_imo='AMC1861710', ship_name='Jayden', country='BD', departure='201602183', container_id='FCUK1755843', container_type='4960', container_group='28VH', net_weight='44804866.62', gross_weight='2240243.33', owner='Streich-Wilkinson', declared='Music, Tools, Automotive & Health', contact='octavia@stammbednar.name', customs_ok='true'),
 Row(ship_imo='POG1615575', ship_name='Lake Eribertoland', country='CR', departure='2016021611', container_id='PDXW1549639', container_type='28VH', container_group='8888', net_weight='16681047.32', gross_weight='500431.42', owner='Senger and Sons', declared='Movies & Jewelery', contact='cindy.dubuque@roberts.org', customs_ok='true')]

In [41]:
data6new = data6.drop("country", "departure", "container_type", \
                      "container_group", "gross_weight", "owner", \
                      "declared", "customs_ok", "contact", "net_weight")

In [42]:
data6new.show()

+----------+------------------+------------+
|  ship_imo|         ship_name|container_id|
+----------+------------------+------------+
|AMC1861710|            Jayden| FCUK1755843|
|POG1615575| Lake Eribertoland| PDXW1549639|
|SQH1155999|            Aileen| PLKO1661930|
|JCI1797526|          Herminio| BXMT1827488|
|MBV1836745|Port Guiseppeburgh| JYIE1892741|
|GYR1192020|         Emardland| LARQ1499256|
|GLV1922612|           Eulalia| ARDX1463154|
|NLH1771681|       Port Noemie| JFPX1246669|
|FUS1202266|  East Mustafaland| ICAV1235470|
|GLV1922612|           Eulalia| KEVU1145768|
|IWE1254579|      North Creola| VDUQ1801278|
|JET1053895|             Jamil| CXZN1286843|
|KSP1096387|             Wiley| YAZN1142572|
|GYR1192020|         Emardland| HVFU1799048|
|GYR1192020|         Emardland| ROML1055099|
|JMP1637582|East Zechariahland| LONM1299749|
|TCU1641123|     New Margarete| XKAO1357085|
|MBV1836745|Port Guiseppeburgh| JYPA1889923|
|POG1615575| Lake Eribertoland| GKXC1181753|
|AEY110836

In [43]:
data6new = data6new.withColumn("total_net_weight", lit(0))

Test con la primera fila de datos

In [44]:
totalweight = sqlContext.sql("SELECT net_weight, gross_weight FROM parquetFile ")
totalweight.rdd.take(1)
float(totalweight.rdd.take(1)[0][0]) + float(totalweight.rdd.take(1)[0][1])

47045109.949999996

In [45]:
totalweight.rdd.map(lambda x: float(x[0][0]) + float(x[0][1]))

PythonRDD[154] at RDD at PythonRDD.scala:48

## Ejercicio 7.
#### Guarda los resultados del ejercicio anterior en formato Parquet.
---

In [None]:
df.write.parquet("ejercicios_6.parquet")

## Ejercicio 8.
#### ¿En qué casos crees que es más eficiente utilizar formatos como Parquet? ¿Existe alguna desventaja frente a formatos de texto como CSV?
---

Los formatos columnares facilitan el procesamiento, lectura y escritura de los motores como Spark, siendo hasta 10 veces más rápidos en leer los mismos datos almacenados en csv.

Un ejemplo práctico en el que observar las ventajas del archivos como parquet es el caso de Amazon Athenea, el cual es un servicio para realizar consultas SQL. ['Ejemplo de precios'](https://aws.amazon.com/es/athena/pricing/)

En el anterior enlace se explica como la misma consulta realizada sobre archivos parquet frente a csv permite reducir el coste de una consulta de 15€(sobre csv) a 1.67€(sobre parquet).

## Ejercicio 9.
#### ¿Es posible procesar XML mediante Spark? ¿Existe alguna restricción por la cual no sea eficiente procesar un único archivo en multiples nodos? ¿Se te ocurre alguna posible solución para trocear archivos suficientemente grandes? ¿Existe la misma problemática con otros formatos de texto como JSON?
---

Sería posible mediante un librería externa (spark.read.xml que viene de databricks)

## Ejercicio 10.
#### Spark SQL tiene una función denominada avg que se utiliza para calcular el promedio de un conjunto de valores ¿Por qué los autores han creado esta función en lugar de usar el API estándar de Python o Scala?
---

Para que pueda ser ejecutada de forma distribuida en un RDD.