In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark
!pip install -q pyspark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [697 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:9 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:10 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [55.5 kB]
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Ge

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark import SparkFiles

In [None]:
spark = SparkSession.builder.appName("luizalabs").master("local[*]").getOrCreate()

## Desafio 1

In [None]:
url_wc = "https://storage.googleapis.com/luizalabs-hiring-test/wordcount.txt"

In [None]:
spark.sparkContext.addFile(url_wc)

In [None]:
txt_rdd = spark.sparkContext.textFile("file://" + SparkFiles.get("wordcount.txt"))

In [None]:
wc = txt_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 0))

In [None]:
df = wc.toDF(["word", "length"])

In [None]:
from pyspark.sql.functions import lower, col, udf
df_lower = df.select(lower(col('word')).alias('word'), col("length"))

In [None]:
df_lower.show()

+----------+------+
|      word|length|
+----------+------+
|hendrerit.|     0|
|    montes|     0|
|     purus|     0|
|    luctus|     0|
|   dictum,|     0|
|       est|     0|
|    mattis|     0|
|       est|     0|
| phasellus|     0|
| dignissim|     0|
|   rhoncus|     0|
|   cubilia|     0|
|       sit|     0|
|      nunc|     0|
|        at|     0|
|  interdum|     0|
|   vivamus|     0|
|    luctus|     0|
|      ante|     0|
|       ac.|     0|
+----------+------+
only showing top 20 rows



In [None]:
import re
from pyspark.sql.types import *
remove_special_characters = udf(lambda x: re.sub("[^0-9a-zA-Z$]+","", x), StringType())
count_characters = udf(lambda x: len(x))
df_special_characters = df_lower.select(remove_special_characters(col("word")).alias("word"), col("length"))

In [None]:
df_special_characters.show()

+---------+------+
|     word|length|
+---------+------+
|hendrerit|     0|
|   montes|     0|
|    purus|     0|
|   luctus|     0|
|   dictum|     0|
|      est|     0|
|   mattis|     0|
|      est|     0|
|phasellus|     0|
|dignissim|     0|
|  rhoncus|     0|
|  cubilia|     0|
|      sit|     0|
|     nunc|     0|
|       at|     0|
| interdum|     0|
|  vivamus|     0|
|   luctus|     0|
|     ante|     0|
|       ac|     0|
+---------+------+
only showing top 20 rows



In [None]:
df_agg = df_special_characters.groupBy("word").agg(count_characters(col("word")).alias("length"))
df_agg.show()

+------------+------+
|        word|length|
+------------+------+
|   porttitor|     9|
|       curae|     5|
|        odio|     4|
|    sociosqu|     8|
|    volutpat|     8|
|    interdum|     8|
|     pretium|     7|
|   hendrerit|     9|
|    sagittis|     8|
|       netus|     5|
|sollicitudin|    12|
|       velit|     5|
|   hymenaeos|     9|
|       lorem|     5|
|         nam|     3|
|       vitae|     5|
|    molestie|     8|
|   penatibus|     9|
|         non|     3|
|    placerat|     8|
+------------+------+
only showing top 20 rows



In [None]:
df_filter = df_agg.filter(col("length") <= 10)
df_filter.show()

+---------+------+
|     word|length|
+---------+------+
|porttitor|     9|
|    curae|     5|
|     odio|     4|
| sociosqu|     8|
| volutpat|     8|
| interdum|     8|
|  pretium|     7|
|hendrerit|     9|
| sagittis|     8|
|    netus|     5|
|    velit|     5|
|hymenaeos|     9|
|    lorem|     5|
|      nam|     3|
|    vitae|     5|
| molestie|     8|
|penatibus|     9|
|      non|     3|
| placerat|     8|
|     quam|     4|
+---------+------+
only showing top 20 rows



In [None]:
maiores_que_10 = df_agg.filter(col("length") > 10).count()

In [None]:
maiores_que_10_list = [["maiores_que_10", maiores_que_10]]

In [None]:
df_maiores_que_10 = spark.createDataFrame(maiores_que_10_list, ["word", "length"])
df_maiores_que_10.show()

+--------------+------+
|          word|length|
+--------------+------+
|maiores_que_10|     7|
+--------------+------+



In [None]:
df_final = df_filter.union(df_maiores_que_10)

In [None]:
df_final.repartition(1).write.option("header","true").option("sep",",").mode("overwrite").csv("count_letters.csv")

## Desafio 2

In [None]:
url_pedidos = "https://storage.googleapis.com/luizalabs-hiring-test/clientes_pedidos.csv"

In [None]:
spark.sparkContext.addFile(url_pedidos)

In [None]:
df_clientes_pedidos = spark.read.csv("file://" +  SparkFiles.get("clientes_pedidos.csv"), header=True, sep=',')
df_clientes_pedidos.show()

+--------------------+--------------------+-----------------------+-----------+
|       codigo_pedido|      codigo_cliente|data_nascimento_cliente|data_pedido|
+--------------------+--------------------+-----------------------+-----------+
|bc8b03a005d5bf742...|b07af86a4a6870737...|    1985-12-04 00:00:00| 1542974527|
|19b0583adf75322cc...|eaaf6b26ef3b9712e...|    1979-11-14 00:00:00| 1542998573|
|58fafb698b6d343e0...|c69f2ab5fc61484d7...|    1989-07-25 00:00:00| 1543007822|
|79dd9f6c88ba32c97...|b4067845511997517...|    1953-12-14 00:00:00| 1542966096|
|968806d40adf6aa8c...|7eecbc06bfec32b80...|    1985-05-03 00:00:00| 1543000756|
|b8c6e74cf1b462489...|f240c43e82dfe3ca0...|    1980-04-16 00:00:00| 1542993637|
|5d91ea3b69a22d55b...|ae03fddbb707cb739...|    1991-11-18 00:00:00| 1542996459|
|5e48766af0ffdfb60...|156f291101e8ba36b...|    1974-01-04 00:00:00| 1543004877|
|f85c3fdcb417a3431...|4cf8efefa3c5aef67...|    1985-04-18 00:00:00| 1542997957|
|8ff2a83de50bb0695...|772b6ad0ae61400f6.

In [None]:
from datetime import date
from datetime import datetime

In [None]:
def calculate_age(born):
    today = date.today()
    return today.year - born.year - ((today.month, today.day) < (born.month, born.day))

In [None]:
convert_age = udf(lambda x: calculate_age(datetime.fromisoformat(x).date()))
convert_timestamp = udf(lambda x: str(date.fromtimestamp(int(x))))

df_pedidos_with_date = df_clientes_pedidos.select(col("codigo_pedido"), col("codigo_cliente"), convert_age(col("data_nascimento_cliente")).alias("idade"), convert_timestamp(col("data_pedido")).alias("data_pedido"))

In [None]:
df_pedidos_with_date.show()

+--------------------+--------------------+-----+-----------+
|       codigo_pedido|      codigo_cliente|idade|data_pedido|
+--------------------+--------------------+-----+-----------+
|bc8b03a005d5bf742...|b07af86a4a6870737...|   35| 2018-11-23|
|19b0583adf75322cc...|eaaf6b26ef3b9712e...|   41| 2018-11-23|
|58fafb698b6d343e0...|c69f2ab5fc61484d7...|   31| 2018-11-23|
|79dd9f6c88ba32c97...|b4067845511997517...|   67| 2018-11-23|
|968806d40adf6aa8c...|7eecbc06bfec32b80...|   36| 2018-11-23|
|b8c6e74cf1b462489...|f240c43e82dfe3ca0...|   41| 2018-11-23|
|5d91ea3b69a22d55b...|ae03fddbb707cb739...|   29| 2018-11-23|
|5e48766af0ffdfb60...|156f291101e8ba36b...|   47| 2018-11-23|
|f85c3fdcb417a3431...|4cf8efefa3c5aef67...|   36| 2018-11-23|
|8ff2a83de50bb0695...|772b6ad0ae61400f6...|   39| 2018-11-23|
|c96bcb263b22c5640...|013590f6b3bcbc02c...|   25| 2018-11-23|
|b3a20e4bfb3799d70...|52c69e3a573310818...|   50| 2018-11-23|
|f87281ade12857969...|3d3ad8389c5262afd...|   56| 2018-11-23|
|01674a4

In [None]:
bf_2017 = "2017-11-24"
bf_2018 = "2018-11-23"

In [None]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import concat_ws, concat, lit

In [None]:
df_bf = df_pedidos_with_date.filter((col("data_pedido") == bf_2017) | (col("data_pedido") == bf_2018))\
.groupBy(col("codigo_cliente")).count()\
.filter(col("count") > 2)

In [None]:
df_lista_pedidos = df_pedidos_with_date.groupBy(col("codigo_cliente"), col("idade"))\
.agg(concat_ws(", ", collect_list(concat(lit("["), concat_ws(", ", "codigo_pedido", "data_pedido"), lit("]")))).alias("lista_pedidos"))

df_lista_pedidos.show(truncate=False)

+--------------------------------+-----+----------------------------------------------------------------------------------------------+
|codigo_cliente                  |idade|lista_pedidos                                                                                 |
+--------------------------------+-----+----------------------------------------------------------------------------------------------+
|00711c564e25613a9281ed955cc37a26|69   |[3768e44e64d849037460a284b0474200, 2018-11-23]                                                |
|0080ec07c29842e585b0178df7578bf5|31   |[256a770e065be6d11b3ae637c95b0607, 2018-11-23]                                                |
|00bef32916008fcce86ec95db54dc994|61   |[d3d1b1681dfba88fda386013e6745475, 2017-11-01], [c9ae109d2963de5ae83e1bcdeb3533ec, 2017-11-01]|
|01217152ec6a750d888d49bd6a7033be|55   |[40aa57266ca96666781aeb8178a17e9d, 2018-11-03]                                                |
|0133726c9e23d25603b3cb46e0bd4030|35   |[c075b80

In [None]:
df_bf.show()

+--------------------+-----+
|      codigo_cliente|count|
+--------------------+-----+
|b67ef7abecc0a8e88...|    3|
|28688f66084a7f1de...|    3|
|3f89d915a06a3d01e...|    3|
|c371799c2befffb67...|    3|
|e7c52e68263476a2d...|    3|
|3bfcd49a281054bbf...|    3|
|f184a197ec54c7eb1...|    3|
|1375a4e01d4811249...|    3|
|3531fd9696a342b74...|    4|
|bd0455c549e900b23...|    3|
|4cf0d3732731a0653...|    3|
|c512a4d48ee9388e1...|    3|
|057ae5d7ef3fcdd74...|    3|
|d2b6a6676c81b7e84...|    3|
|e40809b3ff1805b54...|    4|
|c3f1cb573d199ae93...|    4|
|18d6c857acbfb21b5...|    3|
|3860f681456fae15d...|    3|
|fcf0d288e3488a3ab...|    3|
|01b9f95dce03e2382...|    3|
+--------------------+-----+
only showing top 20 rows



In [None]:
df_pedidos_with_count = df_lista_pedidos.alias('a').join(df_bf.alias('b'), df_lista_pedidos.codigo_cliente == df_bf.codigo_cliente, how="inner")\
.select("a.codigo_cliente", "a.idade", "a.lista_pedidos", "b.count")\
.filter(col("idade") < 30)

In [None]:
df_pedidos_with_count.show()

+--------------------+-----+--------------------+-----+
|      codigo_cliente|idade|       lista_pedidos|count|
+--------------------+-----+--------------------+-----+
|38f1d0ad2967e6cf1...|   21|[acfdeb3c3cfd98b2...|    3|
|919c5078c124581e2...|   26|[e42f8973bd5b9aa8...|    3|
|3224ac7dee3bd6196...|   21|[110fdb292cc6b2cc...|    3|
|8ca15b56dca62d6b9...|   26|[4e6a62283d39c1c5...|    3|
|55a021531f7493737...|   23|[0ab3b13962c74a61...|    3|
|f8c3dababb4fdf6bd...|   25|[4a2410f36334cd4b...|    3|
|f0b083150ce114387...|   26|[8d47e8b849944d95...|    3|
|a6b0fe6ef5ebf35a7...|   26|[4e66698cdd595200...|    3|
|79d7089bd009dddb7...|   27|[9d5d15040ada9530...|    3|
|dcd6e2afc7e67c4fa...|   24|[0dd83caa7e723cf4...|    3|
|d786fd2f6005eab61...|   27|[0f60c7be937aab94...|    3|
|0cf936eed633e9892...|   25|[fabf84630a8a5936...|    3|
|4112bda7877566650...|   24|[08d28852abb06109...|    3|
|a18363603cedb08e9...|   28|[a4f400eeb4813edb...|   38|
|6e41c9257d0c33081...|   29|[45ca94c92bcf3672...

In [None]:
df_final_2 = df_pedidos_with_count.select(col("codigo_cliente"), col("lista_pedidos"), col("count").alias("numero_pedidos"), col("idade"))

In [None]:
df_final_2.show()

+--------------------+--------------------+--------------+-----+
|      codigo_cliente|       lista_pedidos|numero_pedidos|idade|
+--------------------+--------------------+--------------+-----+
|38f1d0ad2967e6cf1...|[acfdeb3c3cfd98b2...|             3|   21|
|919c5078c124581e2...|[e42f8973bd5b9aa8...|             3|   26|
|3224ac7dee3bd6196...|[110fdb292cc6b2cc...|             3|   21|
|8ca15b56dca62d6b9...|[4e6a62283d39c1c5...|             3|   26|
|55a021531f7493737...|[0ab3b13962c74a61...|             3|   23|
|f8c3dababb4fdf6bd...|[4a2410f36334cd4b...|             3|   25|
|f0b083150ce114387...|[8d47e8b849944d95...|             3|   26|
|a6b0fe6ef5ebf35a7...|[4e66698cdd595200...|             3|   26|
|79d7089bd009dddb7...|[9d5d15040ada9530...|             3|   27|
|dcd6e2afc7e67c4fa...|[0dd83caa7e723cf4...|             3|   24|
|d786fd2f6005eab61...|[0f60c7be937aab94...|             3|   27|
|0cf936eed633e9892...|[fabf84630a8a5936...|             3|   25|
|4112bda7877566650...|[08

In [None]:
df_final_2.repartition(1).write.option("header","true").option("sep",",").mode("overwrite").csv("clients_orders_bf.csv")