In [3]:
from pymongo import MongoClient

In [4]:
client = MongoClient("mongodb://mongodb:27017/")

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.2') \
.getOrCreate()

In [6]:
sars = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri",
"mongodb://mongodb:27017/epidemics.sars").load()

In [7]:
sars.createOrReplaceTempView("sars")

In [78]:
sars = spark.sql("SELECT _id, country, date_format(date, 'YYYY-MM-dd') as date, \
int((deaths - lag(deaths,1, deaths) over (partition by country ORDER BY date asc))) as deaths, deaths as cumul_deaths, recovered, \
int((cumul_nb_cases - lag(cumul_nb_cases,1, cumul_nb_cases) over (partition by country ORDER BY date asc))) as cases, \
cumul_nb_cases as cumul_cases, row_number() over (partition by country ORDER BY date asc) as row_num_date FROM sars")

In [79]:
sars.show()

+--------------------+-------+----------+------+------------+---------+-----+-----------+------------+
|                 _id|country|      date|deaths|cumul_deaths|recovered|cases|cumul_cases|row_num_date|
+--------------------+-------+----------+------+------------+---------+-----+-----------+------------+
|[5eaf225b9754f390...| Sweden|2003-04-14|     0|           0|        0|    0|          1|           1|
|[5eaf225b9754f390...| Sweden|2003-04-15|     0|           0|        0|    0|          1|           2|
|[5eaf225b9754f390...| Sweden|2003-04-16|     0|           0|        0|    0|          1|           3|
|[5eaf225b9754f390...| Sweden|2003-04-17|     0|           0|        0|    0|          1|           4|
|[5eaf225b9754f390...| Sweden|2003-04-18|     0|           0|        0|    2|          3|           5|
|[5eaf225b9754f390...| Sweden|2003-04-19|     0|           0|        0|    0|          3|           6|
|[5eaf225b9754f390...| Sweden|2003-04-21|     0|           0|        0|  

In [80]:
sars.createOrReplaceTempView("sars2")

In [81]:
sars2 = spark.sql("select row_number() over (partition by country order by int(cumul_cases) asc) as row_num_cumul, _id, country, date, deaths, cumul_deaths, recovered, cases, \
cumul_cases from sars2")

In [82]:
sars2.createOrReplaceTempView("sars3")

In [83]:
spark.sql("SELECT * FROM sars3").show()

+-------------+--------------------+-------+----------+------+------------+---------+-----+-----------+
|row_num_cumul|                 _id|country|      date|deaths|cumul_deaths|recovered|cases|cumul_cases|
+-------------+--------------------+-------+----------+------+------------+---------+-----+-----------+
|            1|[5eaf225b9754f390...| Sweden|2003-04-14|     0|           0|        0|    0|          1|
|            2|[5eaf225b9754f390...| Sweden|2003-04-15|     0|           0|        0|    0|          1|
|            3|[5eaf225b9754f390...| Sweden|2003-04-16|     0|           0|        0|    0|          1|
|            4|[5eaf225b9754f390...| Sweden|2003-04-17|     0|           0|        0|    0|          1|
|            5|[5eaf225b9754f390...| Sweden|2003-04-18|     0|           0|        0|    2|          3|
|            6|[5eaf225b9754f390...| Sweden|2003-04-19|     0|           0|        0|    0|          3|
|            7|[5eaf225b9754f390...| Sweden|2003-04-21|     0|  

In [84]:
sars3 = spark.sql("SELECT CONCAT(row_num_cumul, country) as cle2,cumul_cases as cumul_cases_asc FROM sars3")

In [85]:
sars3.show()

+--------+---------------+
|    cle2|cumul_cases_asc|
+--------+---------------+
| 1Sweden|              1|
| 2Sweden|              1|
| 3Sweden|              1|
| 4Sweden|              1|
| 5Sweden|              3|
| 6Sweden|              3|
| 7Sweden|              3|
| 8Sweden|              3|
| 9Sweden|              3|
|10Sweden|              3|
|11Sweden|              3|
|12Sweden|              3|
|13Sweden|              3|
|14Sweden|              3|
|15Sweden|              3|
|16Sweden|              3|
|17Sweden|              3|
|18Sweden|              3|
|19Sweden|              3|
|20Sweden|              3|
+--------+---------------+
only showing top 20 rows



In [43]:
#sars3.write.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://mongodb:27017/epidemics2.epidemics_sars23").save()

In [86]:
sars.createOrReplaceTempView("sars4")

In [87]:
sars4 = spark.sql("SELECT _id, country, date, deaths, cumul_deaths, recovered, cases, cumul_cases, CONCAT(row_num_date, country) as cle1 FROM sars4")

In [88]:
sars4.createOrReplaceTempView("sars5")

In [89]:
sars3.createOrReplaceTempView("sarsCle")

In [90]:
spark.sql("select * from sarsCle").show()

+--------+---------------+
|    cle2|cumul_cases_asc|
+--------+---------------+
| 1Sweden|              1|
| 2Sweden|              1|
| 3Sweden|              1|
| 4Sweden|              1|
| 5Sweden|              3|
| 6Sweden|              3|
| 7Sweden|              3|
| 8Sweden|              3|
| 9Sweden|              3|
|10Sweden|              3|
|11Sweden|              3|
|12Sweden|              3|
|13Sweden|              3|
|14Sweden|              3|
|15Sweden|              3|
|16Sweden|              3|
|17Sweden|              3|
|18Sweden|              3|
|19Sweden|              3|
|20Sweden|              3|
+--------+---------------+
only showing top 20 rows



In [91]:
sars5 = spark.sql("SELECT cle2, _id, country, date, deaths, cumul_deaths, recovered, cases, cumul_cases, cumul_cases_asc FROM sars5 JOIN sarsCle ON sars5.cle1=sarsCle.cle2 \
ORDER BY country, date asc")

In [92]:
sars5.write.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://mongodb:27017/epidemics2.epidemics_sars28").save()