In [1]:
#
###### https://github.com/LucaCanali/Miscellaneous/blob/master/Pyspark_SQL_Magic_Jupyter/IPython_Pyspark_SQL_Magic.py
#
#
# IPython magic functions to use with Pyspark and Spark SQL
# The following code is intended as examples of shorcuts to simplify the use of SQL in pyspark
# The defined functions are:
#
# %sql <statement>          - return a Spark DataFrame for lazy evaluation of the SQL
# %sql_show <statement>     - run the SQL statement and show max_show_lines (50) lines
# %sql_display <statement>  - run the SQL statement and display the results using a HTML table 
#                           - this is implemented passing via Pandas and displays up to max_show_lines (50)
# %sql_explain <statement>  - display the execution plan of the SQL statement
#
# Use: %<magic> for line magic or %%<magic> for cell magic.
#
# Author: Luca.Canali@cern.ch
# September 2016
#

from IPython.core.magic import register_line_cell_magic

# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return sqlContext.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return sqlContext.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return sqlContext.sql(val).explain(detailed_explain)
#

In [2]:
##############################
###### Load The Delta   ######
##############################
###
### Input delta in folder :  /data
anldate="2020-10-27"
my_input_csv_table="World_v2--Confirmed-1DayForecast--train_"+anldate+"-Copy1.csv"
delta_location_covid19="file:///home/notebookuser/notebooks/data/delta_daily-covid19-global-analysis/"
###
######
##############################Execution##########################
import findspark
findspark.init()
#
import pyspark
from pyspark.sql import functions as pfunc
from pyspark.sql import SQLContext
from pyspark.sql import Window, types
import re
import pandas as pd
import numpy as np
from pandas import DataFrame
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from scipy.stats import kstest
from scipy import stats
#
import subprocess
#
sc = pyspark.SparkContext(appName="Daily-Covid19-Global-Analysis")
sqlContext = SQLContext(sc)
#
#
internal_csv_files="file:///home/notebookuser/notebooks/covid19/data/"+my_input_csv_table
#
# Join with Internal Curation Data in urltopredict staged folder
from pyspark.sql import functions as F
### Scructure
from pyspark.sql.types import StructField,IntegerType,StructType,StringType,FloatType,DoubleType
newDF=[StructField('date',StringType(),True),
       StructField('region',StringType(),True),
       StructField('confirmed',StringType(),True),
       StructField('1_day_change',FloatType(),True),
       StructField('3_day_change',FloatType(),True),
       StructField('7_day_change',FloatType(),True),
       StructField('1_day_change_rate',DoubleType(),True),
       StructField('3_day_change_rate',DoubleType(),True),
       StructField('7_day_change_rate',DoubleType(),True),
       StructField('confirmed_yesterday',FloatType(),True),
       StructField('confirmed_prediction',FloatType(),True),
       StructField('population',FloatType(),True),
       StructField('infected_rate',DoubleType(),True),
       StructField('min',FloatType(),True),
       StructField('max',FloatType(),True),
       StructField('population_percentage_infected_rate_confirmed',DoubleType(),True),
       StructField('population_percentage_factor_9a10_infected_rate_confirmed',DoubleType(),True),
       StructField('delta_new_cases',FloatType(),True),
       StructField('delta_new_cases_per_1M_hab',FloatType(),True),
       StructField('delta_roling_7day_AVG',FloatType(),True),
       StructField('delta_aprox_14day_case_notification_rate_per_100k_hab',FloatType(),True)
       ]
finalStruct=StructType(fields=newDF)
#
dataframe_df1=sqlContext.read.csv(internal_csv_files,header="true",schema=finalStruct)\
.fillna("",subset=['date', 'region'])\
.fillna(0, subset=['1_day_change', '3_day_change', '7_day_change', 'confirmed_yesterday', 'confirmed_prediction', 'population', 'infected_rate', 'min', 'max', 'population_percentage_infected_rate_confirmed', 'population_percentage_factor_9a10_infected_rate_confirmed', 'population_percentage_factor_9a10_infected_rate_confirmed', 'delta_new_cases', 'delta_new_cases_per_1M_hab', 'delta_roling_7day_AVG', 'delta_aprox_14day_case_notification_rate_per_100k_hab'])\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#
## as confirmed is null in "today's date" it created a conflict wirh spark hashing
###
##Py4JJavaError: An error occurred while calling o69.collectToPython.
##: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 258, localhost, executor driver): org.apache.hadoop.fs.ChecksumException: Checksum error: file:/home/notebookuser/notebooks/data/delta_daily-covid19-global-analysis/part-00002-eb251372-48d4-46e2-a41c-36ec8e3284e0-c000.snappy.parquet at 0 exp: 4760148 got: -1297451656
##	at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:323)
##	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:279)
##	at org.apache.hadoop.fs.FSInputChecker.fill(FSInputChecker.java:214)
##	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:232)
###    
columns_to_drop = ['1_day_change_rate', '3_day_change_rate', '7_day_change_rate', 'infected_rate', 'min', 'max']
#
dataframe_df1.printSchema()
dataframe_df1.drop(*columns_to_drop).show(5)
df2=dataframe_df1.drop(*columns_to_drop)
df2.write.mode('overwrite').format("delta").save(delta_location_covid19)
###
####
#sc.stop()
#
print("Data Load Done!")
#

root
 |-- date: string (nullable = false)
 |-- region: string (nullable = false)
 |-- confirmed: string (nullable = true)
 |-- 1_day_change: float (nullable = false)
 |-- 3_day_change: float (nullable = false)
 |-- 7_day_change: float (nullable = false)
 |-- 1_day_change_rate: double (nullable = true)
 |-- 3_day_change_rate: double (nullable = true)
 |-- 7_day_change_rate: double (nullable = true)
 |-- confirmed_yesterday: float (nullable = false)
 |-- confirmed_prediction: float (nullable = false)
 |-- population: float (nullable = false)
 |-- infected_rate: double (nullable = false)
 |-- min: float (nullable = false)
 |-- max: float (nullable = false)
 |-- population_percentage_infected_rate_confirmed: double (nullable = false)
 |-- population_percentage_factor_9a10_infected_rate_confirmed: double (nullable = false)
 |-- delta_new_cases: float (nullable = false)
 |-- delta_new_cases_per_1M_hab: float (nullable = false)
 |-- delta_roling_7day_AVG: float (nullable = false)
 |-- delta_a

In [3]:
##############################
###### Load The Delta   ######
##############################
###
### Input delta in folder :  /data 
my_input_delta_table="delta_daily-covid19-global-analysis"
###
######
##############################Execution##########################
import findspark
findspark.init()
#
import pyspark
from pyspark.sql import functions as pfunc
from pyspark.sql import SQLContext
from pyspark.sql import Window, types
import re
import pandas as pd
import numpy as np
from pandas import DataFrame
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf
from pyspark.sql.functions import *
from scipy.stats import kstest
from scipy import stats
#
import subprocess
#
#sc = pyspark.SparkContext(appName="Daily-Covid19-Analysis-Delta")
#sqlContext = SQLContext(sc)
#
#
internal_delta_files="file:///home/notebookuser/notebooks/data/"+my_input_delta_table
#
# Join with Internal Curation Data in urltopredict staged folder
from pyspark.sql import functions as F
delta_dataframe_df1=sqlContext.read.format("delta").load(internal_delta_files)\
.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2)
#
## ,schema=finalStruct)\
## Py4JJavaError: An error occurred while calling o48.load.
## : org.apache.spark.sql.AnalysisException: delta does not allow user-specified schemas.;
##      	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:325)
#
delta_dataframe_df1.printSchema()
delta_dataframe_df1.registerTempTable("covid19_jh_analysis")
#
#
print("Delta temp table registration Done!")
#

root
 |-- date: string (nullable = true)
 |-- region: string (nullable = true)
 |-- confirmed: string (nullable = true)
 |-- 1_day_change: float (nullable = true)
 |-- 3_day_change: float (nullable = true)
 |-- 7_day_change: float (nullable = true)
 |-- confirmed_yesterday: float (nullable = true)
 |-- confirmed_prediction: float (nullable = true)
 |-- population: float (nullable = true)
 |-- population_percentage_infected_rate_confirmed: double (nullable = true)
 |-- population_percentage_factor_9a10_infected_rate_confirmed: double (nullable = true)
 |-- delta_new_cases: float (nullable = true)
 |-- delta_new_cases_per_1M_hab: float (nullable = true)
 |-- delta_roling_7day_AVG: float (nullable = true)
 |-- delta_aprox_14day_case_notification_rate_per_100k_hab: float (nullable = true)

Delta temp table registration Done!


In [4]:
%%sql_display
select * from covid19_jh_analysis  order by date desc limit 5

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab
0,2020-10-27,Rwanda_nan,0,7.0,21.0,81.0,5073.0,5090.0,10000000.0,0.000507,0.005074,17.0,1.7,11.571428,1.62
1,2020-10-27,San Marino_nan,0,33.0,33.0,86.0,852.0,855.0,33916.0,0.02515,0.251496,3.0,88.453827,12.285714,507.135284
2,2020-10-27,Saint Kitts and Nevis_nan,0,0.0,0.0,0.0,19.0,19.0,10000000.0,2e-06,2e-05,0.0,0.0,0.0,0.0
3,2020-10-27,Qatar_nan,0,262.0,721.0,1761.0,131432.0,131683.0,2870256.0,0.045791,0.457914,251.0,87.448647,251.571426,122.706825
4,2020-10-27,Saint Lucia_nan,0,9.0,15.0,27.0,63.0,64.0,10000000.0,6e-06,6.4e-05,1.0,0.1,3.857143,0.54


In [5]:
%%sql_display
select * from covid19_jh_analysis  order by date limit 5

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab
0,2020-01-22,Rwanda_nan,0,0.0,0.0,0.0,0.0,0.0,10000000.0,9.999999e-08,9.999999e-07,0.0,0.0,0.0,0.0
1,2020-01-22,San Marino_nan,0,0.0,0.0,0.0,0.0,0.0,33916.0,2.948374e-05,0.0002948374,0.0,0.0,0.0,0.0
2,2020-01-22,Saint Kitts and Nevis_nan,0,0.0,0.0,0.0,0.0,0.0,10000000.0,9.999999e-08,9.999999e-07,0.0,0.0,0.0,0.0
3,2020-01-22,Russia_nan,0,0.0,0.0,0.0,0.0,0.0,144500000.0,6.920415e-09,6.920415e-08,0.0,0.0,0.0,0.0
4,2020-01-22,Saint Lucia_nan,0,0.0,0.0,0.0,0.0,0.0,10000000.0,9.999999e-08,9.999999e-07,0.0,0.0,0.0,0.0


### The Regions in the World with +80 'notifications per 100k hab. in last 14days' and +300 cases yesterday

In [7]:
%%sql_display
select * from covid19_jh_analysis where region is not null and
    date >= '2020-10-27' and
    1_day_change >= 300 and
    delta_aprox_14day_case_notification_rate_per_100k_hab >= 80
order by region asc
limit 50

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab
0,2020-10-27,Argentina_nan,0,11712.0,32933.0,99639.0,1102301.0,1119069.0,45106700.0,0.024438,0.244377,16768.0,371.740784,14234.142578,441.79245
1,2020-10-27,Armenia_nan,0,973.0,5500.0,13350.0,78810.0,81165.0,2962061.0,0.026607,0.266068,2355.0,795.054504,1907.142822,901.399414
2,2020-10-27,Austria_nan,0,2456.0,8852.0,17340.0,83267.0,85508.0,8822000.0,0.009439,0.094387,2241.0,254.024033,2477.142822,393.108124
3,2020-10-27,Azerbaijan_nan,0,527.0,2265.0,5191.0,50486.0,51286.0,10119557.0,0.004989,0.049891,800.0,79.054848,741.571411,102.593422
4,2020-10-27,Belarus_nan,0,884.0,2540.0,5417.0,93707.0,94377.0,9449974.0,0.009916,0.099162,670.0,70.899666,773.857117,114.645821
5,2020-10-27,Bosnia and Herzegovina_nan,0,703.0,3103.0,6935.0,41596.0,42680.0,3285028.0,0.012663,0.126626,1084.0,329.981964,990.714294,422.218628
6,2020-10-27,Brazil_nan,0,29219.0,56198.0,159127.0,5409854.0,5433387.0,212559008.0,0.025451,0.254511,23533.0,110.712791,22732.427734,149.725021
7,2020-10-27,Bulgaria_nan,0,2243.0,3613.0,9605.0,40132.0,41140.0,6959195.0,0.005767,0.057669,1008.0,144.844345,1372.142822,276.037659
8,2020-10-27,Canada_Alberta,0,1472.0,1472.0,3060.0,25733.0,25667.0,4345737.0,0.005922,0.059217,-66.0,-15.187298,437.142853,140.827667
9,2020-10-27,Canada_Ontario,0,841.0,2844.0,6235.0,73984.0,74708.0,14570000.0,0.005078,0.050779,724.0,49.691147,890.714294,85.586823


In [8]:
%%sql_display
select * from covid19_jh_analysis where region > 'Serbia_nan' and
    date >= '2020-10-27' and
    1_day_change >= 300 and
    delta_aprox_14day_case_notification_rate_per_100k_hab >= 80
order by region asc
limit 50

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab
0,2020-10-27,Slovakia_nan,0,1312.0,7244.0,14460.0,45155.0,47384.0,5459087.0,0.008272,0.082717,2229.0,408.310028,2065.714355,529.758911
1,2020-10-27,Slovenia_nan,0,1130.0,4773.0,10401.0,24080.0,25638.0,2078878.0,0.011584,0.115836,1558.0,749.442749,1485.857178,1000.635925
2,2020-10-27,Spain_nan,0,52188.0,52188.0,123871.0,1098320.0,1100715.0,46660000.0,0.023539,0.235388,2395.0,51.328762,17695.857422,530.951538
3,2020-10-27,Switzerland_nan,0,17440.0,17440.0,37934.0,121093.0,120679.0,8570000.0,0.01413,0.1413,-414.0,-48.308052,5419.143066,885.274231
4,2020-10-27,Tunisia_nan,0,3600.0,5185.0,9672.0,52399.0,52798.0,11791968.0,0.004444,0.044437,399.0,33.83659,1381.714233,164.043869
5,2020-10-27,Ukraine_nan,0,5625.0,19306.0,47061.0,359348.0,366701.0,43787980.0,0.008207,0.082066,7353.0,167.922806,6723.0,214.949402
6,2020-10-27,United Arab Emirates_nan,0,1111.0,3961.0,9717.0,126234.0,127777.0,9400000.0,0.013429,0.134293,1543.0,164.148941,1388.142822,206.744675
7,2020-10-27,United Kingdom_nan,0,20890.0,63692.0,153478.0,894690.0,916974.0,66440000.0,0.013466,0.134661,22284.0,335.40036,21925.427734,462.004822
8,2020-10-27,United States_nan,0,66784.0,211291.0,489769.0,8702750.0,8775155.0,327200000.0,0.026598,0.265976,72405.0,221.286682,69967.0,299.369812
9,2020-10-27,West Bank and Gaza_nan,0,510.0,1373.0,3336.0,50952.0,51403.0,4543126.0,0.011215,0.112154,451.0,99.270851,476.571442,146.859238


### Regions of Risk where population.csv has not been fullfiled and may compromise ratios

In [9]:
%%sql_display
select * from covid19_jh_analysis where region is not null and
    date >= '2020-10-27' and
    1_day_change >= 300 and
    delta_aprox_14day_case_notification_rate_per_100k_hab >= 80 and
    population >= 10000000 and population <= 10000000
order by region asc
limit 50

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab


In [10]:
%%sql_display
select * from covid19_jh_analysis where region is not null and
    date >= '2020-10-27' and
    1_day_change >= 100 and
    delta_aprox_14day_case_notification_rate_per_100k_hab >= 50 and
    population >= 10000000 and population <= 10000000
order by region asc
limit 50

Unnamed: 0,date,region,confirmed,1_day_change,3_day_change,7_day_change,confirmed_yesterday,confirmed_prediction,population,population_percentage_infected_rate_confirmed,population_percentage_factor_9a10_infected_rate_confirmed,delta_new_cases,delta_new_cases_per_1M_hab,delta_roling_7day_AVG,delta_aprox_14day_case_notification_rate_per_100k_hab


### Regions where population.cav has not been fulfilled and model has default 10,000,000 hab.

In [11]:
%%sql_display
select distinct(region) from covid19_jh_analysis where population >= 10000000 and population <= 10000000

Unnamed: 0,region
0,Papua New Guinea_nan
1,Gabon_nan
2,Madagascar_nan
3,Solomon Islands_nan
4,Kosovo_nan
5,Yemen_nan
6,Rwanda_nan
7,United Kingdom_British Virgin Islands
8,Australia_Tasmania
9,France_New Caledonia


In [13]:
%%sql_display
select distinct(region) from covid19_jh_analysis 
where where region is not null and
    date >= '2020-10-27' and population >= 10000000 and population <= 10000000 order by region asc

Unnamed: 0,region
0,Angola_nan
1,Antigua and Barbuda_nan
2,Australia_Australian Capital Territory
3,Australia_Northern Territory
4,Australia_Tasmania
5,Australia_Western Australia
6,Bahamas_nan
7,Barbados_nan
8,Belize_nan
9,Benin_nan


In [14]:
%%sql_display
select distinct(region) from covid19_jh_analysis 
where where date >= '2020-10-27' and region > 'France_Saint Pierre and Miquelon' and
     population >= 10000000 and population <= 10000000 order by region asc

Unnamed: 0,region
0,France_St Martin
1,Gabon_nan
2,Gambia_nan
3,Grenada_nan
4,Guinea-Bissau_nan
5,Guinea_nan
6,Guyana_nan
7,Haiti_nan
8,Jamaica_nan
9,Kosovo_nan


In [16]:
%%sql_display
select distinct(region) from covid19_jh_analysis 
where where date >= '2020-10-27' and region > 'Togo_nan' and
     population >= 10000000 and population <= 10000000 order by region asc

Unnamed: 0,region
0,Trinidad and Tobago_nan
1,Uganda_nan
2,United Kingdom_Anguilla
3,United Kingdom_Bermuda
4,United Kingdom_British Virgin Islands
5,United Kingdom_Cayman Islands
6,United Kingdom_Turks and Caicos Islands
7,Western Sahara_nan
8,Yemen_nan
9,Zambia_nan


In [17]:
#
print("Analysis Done!")
#

Analysis Done!


In [18]:
sc.stop()

In [19]:
exit()

ERROR:root:Invalid alias: The name clear can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name more can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name less can't be aliased because it is another magic command.
ERROR:root:Invalid alias: The name man can't be aliased because it is another magic command.
