# Retail  Real-Time Demo

### Python Connectivity

In [None]:
### Importing library
! pip3 install scylla-driver ipyparallel

In [None]:
from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy, DowngradingConsistencyRetryPolicy, ConsistencyLevel, RoundRobinPolicy
from cassandra.query import tuple_factory
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark import SparkContext
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, FloatType, DateType, LongType

##### Set your Scylla IP(s):

In [None]:
IPS=['172.19.0.2']

#### Setup the execution profile:

In [None]:
profile = ExecutionProfile(
    load_balancing_policy=TokenAwarePolicy(DCAwareRoundRobinPolicy("datacenter1")),
    retry_policy=DowngradingConsistencyRetryPolicy(), ##CHECK 
    consistency_level=ConsistencyLevel.LOCAL_QUORUM,
    serial_consistency_level=ConsistencyLevel.LOCAL_SERIAL, ##CHECK LWT 
    request_timeout=20000,
    row_factory=tuple_factory
)

In [None]:
cluster = Cluster(IPS,execution_profiles={EXEC_PROFILE_DEFAULT: profile})
session = cluster.connect()


#### Create Keyspace and connect to it:

In [None]:
create_keyspace = "CREATE KEYSPACE IF NOT EXISTS tpcds WITH replication = {'class': 'NetworkTopologyStrategy', 'datacenter1' : '1'};"
#session = cluster.connect()
session.execute(create_keyspace)
session = cluster.connect('tpcds')


#### Create the tables from the CQL file:

In [None]:

with open('ddl.cql', mode='r') as f:
    txt = f.read()
    stmts = txt.split(r';')
    for i in stmts:
        stmt = i.strip()
        if stmt != '':
            print('Executing: "' + stmt + '"')
            session.execute(stmt)
    

## Generate Data for Dimension Tables

#### RUN ON YOUR HOST, NOT ON THE DOCKER


sudo apt-get install gcc make flex bison byacc git
git clone https://github.com/gregrahn/tpcds-kit
cd /home/$USER/tpcds-kit/tools/ 
make OS=LINUX 
mv /home/$USER/tpcds-kit/tools/ /home/$USER/scylla-code-samples/spark3-scylla4-demo/tools/


#### Generating CSV using DSDGEN 

In [10]:
!cd tools/ && ./dsdgen -sc 1 && mv ./tools/dsdgen/*.dat /home/jovyan/work/data/

dsdgen Population Generator (Version 2.10.0)
Copyright Transaction Processing Performance Council (TPC) 2001 - 2018
ERROR: ./call_center.dat exists. Either remove it or use the FORCE option to overwrite it.


### Loading using Spark
#### Spark Integration and configuration

In [None]:
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
spark = SparkSession\
    .builder\
    .appName("IoT-Scylla")\
    .config("setMaster","local[*]")\
    .config("spark.jars", "target/scala-2.12/spark3-scylla4-example-assembly-0.1.jar")\
    .config("spark.cassandra.connection.host", ','.join(IPS))\
    .config('spark.cassandra.concurrent.reads','512')\
    .config('spark.cassandra.input.consistency.level','LOCAL_ONE')\
    .config('spark.cassandra.input.fetch.sizeInRows','1000')\
    .config('spark.cassandra.input.split.sizeInMB','512')\
    .config("spark.driver.memory", "5g")\
    .config("spark.executor.memory", "5g")\
    .config("spark.driver.cores",5)\
    .getOrCreate()
sc = spark.sparkContext

#### Defining schemas and writting the CSV into Scylla using Spark

In [None]:
print("store")

schema_store= StructType([StructField("s_store_sk",IntegerType(),True),StructField("s_store_id",StringType(),True),StructField("s_rec_start_date",StringType(),True),StructField("s_rec_end_date",StringType(),True),StructField("s_closed_date_sk",IntegerType(),True),StructField("s_store_name",StringType(),True),StructField("s_number_employees",IntegerType(),True),StructField("s_floor_space",IntegerType(),True),StructField("s_hours",StringType(),True),StructField("s_manager",StringType(),True),StructField("s_market_id",IntegerType(),True),StructField("s_geography_class",StringType(),True),StructField("s_market_desc",StringType(),True),StructField("s_market_manager",StringType(),True),StructField("s_division_id",IntegerType(),True),StructField("s_division_name",StringType(),True),StructField("s_company_id",IntegerType(),True),StructField("s_company_name",StringType(),True),StructField("s_street_number",StringType(),True),StructField("s_street_name",StringType(),True),StructField("s_street_type",StringType(),True),StructField("s_suite_number",StringType(),True),StructField("s_city",StringType(),True),StructField("s_county",StringType(),True),StructField("s_state",StringType(),True),StructField("s_zip",StringType(),True),StructField("s_country",StringType(),True),StructField("s_gmt_offset",FloatType(),True),StructField("s_tax_precentage",FloatType(),True),StructField("junk",StringType(),True),])            
csv_store = spark.read.format("csv").schema(schema_store).load("./work/data/store.dat", delimiter="|")            
csv_store.write.format("org.apache.spark.sql.cassandra").options(table="store",keyspace="tpcds").mode("append").save()  


print("customer_address")
schema_customer_address = StructType([ StructField("ca_address_sk",IntegerType(),True), StructField("ca_address_id",StringType(),True), StructField("ca_street_number",StringType(),True), StructField("ca_street_name",StringType(),True), StructField("ca_street_type",StringType(),True), StructField("ca_suite_number",StringType(),True), StructField("ca_city",StringType(),True), StructField("ca_county",StringType(),True), StructField("ca_state",StringType(),True), StructField("ca_zip",StringType(),True), StructField("ca_country",StringType(),True), StructField("ca_gmt_offset",FloatType(),True), StructField("ca_location_type",StringType(),True), StructField("junk",StringType(),True), ])
csv_customer_address = spark.read.format("csv").schema(schema_customer_address).load("./work/data/customer_address*.dat", delimiter="|")
csv_customer_address.write.format("org.apache.spark.sql.cassandra").options(table="customer_address",keyspace="tpcds").mode("append").save()



print("customer")
schema_customer = StructType([ StructField("c_customer_sk",IntegerType(),True), StructField("c_customer_id",StringType(),True), StructField("c_current_cdemo_sk",IntegerType(),True), StructField("c_current_hdemo_sk",IntegerType(),True), StructField("c_current_addr_sk",IntegerType(),True), StructField("c_first_shipto_date_sk",IntegerType(),True), StructField("c_first_sales_date_sk",IntegerType(),True), StructField("c_salutation",StringType(),True), StructField("c_first_name",StringType(),True), StructField("c_last_name",StringType(),True), StructField("c_preferred_cust_flag",StringType(),True), StructField("c_birth_day",IntegerType(),True), StructField("c_birth_month",IntegerType(),True), StructField("c_birth_year",IntegerType(),True), StructField("c_birth_country",StringType(),True), StructField("c_login",StringType(),True), StructField("c_email_address",StringType(),True), StructField("c_last_review_date",StringType(),True), StructField("junk",StringType(),True) ])
csv_customer = spark.read.format("csv").schema(schema_customer).load("./work/data/customer*.dat", delimiter="|")
csv_customer.write.format("org.apache.spark.sql.cassandra").options(table="customer",keyspace="tpcds").mode("append").save()

print("customer_demo")
schema_customer_demo = StructType([ StructField("cd_demo_sk",IntegerType(),True), StructField("cd_gender",StringType(),True), StructField("cd_marital_status",StringType(),True), StructField("cd_education_status",StringType(),True), StructField("cd_purchase_estimate",IntegerType(),True), StructField("cd_credit_rating",StringType(),True), StructField("cd_dep_count",IntegerType(),True), StructField("cd_dep_employed_count",IntegerType(),True), StructField("cd_dep_college_count",IntegerType(),True), StructField("junk",StringType(),True), ])
csv_customer_demo = spark.read.format("csv").schema(schema_customer_demo).load("./work/data/customer_demographics*.dat", delimiter="|")
csv_customer_demo.write.format("org.apache.spark.sql.cassandra").options(table="customer_demographics",keyspace="tpcds").mode("append").save()

schema_inventory = StructType([ StructField("inv_date_sk",IntegerType(),True), StructField("inv_item_sk",IntegerType(),True), StructField("inv_warehouse_sk",IntegerType(),True), StructField("inv_quantity_on_hand",IntegerType(),True), StructField("junk",IntegerType(),True),  ])
csv_inventory = spark.read.format("csv").schema(schema_inventory).load("./work/data/inventory*.dat", delimiter="|")
csv_inventory.write.format("org.apache.spark.sql.cassandra").options(table="inventory",keyspace="tpcds").mode("append").save()   


print("schema_web_site")

schema_web_site = StructType([StructField("web_site_sk",IntegerType(),True),StructField("web_site_id",StringType(),True),StructField("web_rec_start_date",StringType(),True),StructField("web_rec_end_date",StringType(),True),StructField("web_name",StringType(),True),StructField("web_open_date_sk",IntegerType(),True),StructField("web_close_date_sk",IntegerType(),True),StructField("web_class",StringType(),True),StructField("web_manager",StringType(),True),StructField("web_mkt_id",IntegerType(),True),StructField("web_mkt_class",StringType(),True),StructField("web_mkt_desc",StringType(),True),StructField("web_market_manager",StringType(),True),StructField("web_company_id",IntegerType(),True),StructField("web_company_name",StringType(),True),StructField("web_street_number",StringType(),True),StructField("web_street_name",StringType(),True),StructField("web_street_type",StringType(),True),StructField("web_suite_number",StringType(),True),StructField("web_city",StringType(),True),StructField("web_county",StringType(),True),StructField("web_state",StringType(),True),StructField("web_zip",StringType(),True),StructField("web_country",StringType(),True),StructField("web_gmt_offset",FloatType(),True),StructField("web_tax_percentage",FloatType(),True),StructField("junk",StringType(),True),   ])  
csv_web_site = spark.read.format("csv").schema(schema_web_site).load("./work/data/web_site*.dat", delimiter="|")            
csv_web_site.write.format("org.apache.spark.sql.cassandra").options(table="web_site",keyspace="tpcds").mode("append").save()

print("schema_web_page")

schema_web_page = StructType([StructField("wp_web_page_sk",IntegerType(),True),StructField("wp_web_page_id",StringType(),True),StructField("wp_rec_start_date",StringType(),True),StructField("wp_rec_end_date",StringType(),True),StructField("wp_creation_date_sk",IntegerType(),True),StructField("wp_access_date_sk",IntegerType(),True),StructField("wp_autogen_flag",StringType(),True),StructField("wp_customer_sk",IntegerType(),True),StructField("wp_url",StringType(),True),StructField("wp_type",StringType(),True),StructField("wp_char_count",IntegerType(),True),StructField("wp_link_count",IntegerType(),True),StructField("wp_image_count",IntegerType(),True),StructField("wp_max_ad_count",IntegerType(),True),StructField("junk",StringType(),True),])  
csv_web_page = spark.read.format("csv").schema(schema_web_page).load("./work/data/web_page*.dat", delimiter="|")            
csv_web_page.write.format("org.apache.spark.sql.cassandra").options(table="web_page",keyspace="tpcds").mode("append").save()

print("schema_warehouse")

schema_warehouse = StructType([StructField("w_warehouse_sk",IntegerType(),True),StructField("w_warehouse_id",StringType(),True),StructField("w_warehouse_name",StringType(),True),StructField("w_warehouse_sq_ft",IntegerType(),True),StructField("w_street_number",StringType(),True),StructField("w_street_name",StringType(),True),StructField("w_street_type",StringType(),True),StructField("w_suite_number",StringType(),True),StructField("w_city",StringType(),True),StructField("w_county",StringType(),True),StructField("w_state",StringType(),True),StructField("w_zip",StringType(),True),StructField("w_country",StringType(),True),StructField("w_gmt_offset",FloatType(),True),StructField("junk",StringType(),True), ])            
csv_warehouse = spark.read.format("csv").schema(schema_warehouse).load("./work/data/warehouse*.dat", delimiter="|")            
csv_warehouse.write.format("org.apache.spark.sql.cassandra").options(table="warehouse",keyspace="tpcds").mode("append").save()

print("schema_time")

schema_time= StructType([StructField("t_time_sk",IntegerType(),True),StructField("t_time_id",StringType(),True),StructField("t_time",IntegerType(),True),StructField("t_hour",IntegerType(),True),StructField("t_minute",IntegerType(),True),StructField("t_second",IntegerType(),True),StructField("t_am_pm",StringType(),True),StructField("t_shift",StringType(),True),StructField("t_sub_shift",StringType(),True),StructField("t_meal_time",StringType(),True),StructField("junk",StringType(),True), ])            
csv_time_dim = spark.read.format("csv").schema(schema_time).load("./work/data/time_dim*.dat", delimiter="|")            
csv_time_dim.write.format("org.apache.spark.sql.cassandra").options(table="time_dim",keyspace="tpcds").mode("append").save()   


print("csv_ship_mode")

schema_ship_mode= StructType([StructField("sm_ship_mode_sk",IntegerType(),True),StructField("sm_ship_mode_id",StringType(),True),StructField("sm_type",StringType(),True),StructField("sm_code",StringType(),True),StructField("sm_carrier",StringType(),True),StructField("sm_contract",StringType(),True),StructField("junk",StringType(),True),])            
csv_ship_mode = spark.read.format("csv").schema(schema_ship_mode).load("./work/data/ship_mode*.dat", delimiter="|")            
csv_ship_mode.write.format("org.apache.spark.sql.cassandra").options(table="ship_mode",keyspace="tpcds").mode("append").save()  

print("schema_promotion")
schema_promotion= StructType([StructField("p_promo_sk",IntegerType(),True),StructField("p_promo_id",StringType(),True),StructField("p_start_date_sk",IntegerType(),True),StructField("p_end_date_sk",IntegerType(),True),StructField("p_item_sk",IntegerType(),True),StructField("p_cost",FloatType(),True),StructField("p_response_target",IntegerType(),True),StructField("p_promo_name",StringType(),True),StructField("p_channel_dmail",StringType(),True),StructField("p_channel_email",StringType(),True),StructField("p_channel_catalog",StringType(),True),StructField("p_channel_tv",StringType(),True),StructField("p_channel_radio",StringType(),True),StructField("p_channel_press",StringType(),True),StructField("p_channel_event",StringType(),True),StructField("p_channel_demo",StringType(),True),StructField("p_channel_details",StringType(),True),StructField("p_purpose",StringType(),True),StructField("p_discount_active",StringType(),True),StructField("junk",StringType(),True),])            
csv_promotion = spark.read.format("csv").schema(schema_promotion).load("./work/data/promotion*.dat", delimiter="|")            
csv_promotion.write.format("org.apache.spark.sql.cassandra").options(table="promotion",keyspace="tpcds").mode("append").save()  

print("schema_reason")

schema_reason= StructType([StructField("r_reason_sk",IntegerType(),True),StructField("r_reason_id",StringType(),True),StructField("r_reason_desc",StringType(),True),StructField("junk",StringType(),True),])            
csv_reason = spark.read.format("csv").schema(schema_reason).load("./work/data/reason*.dat", delimiter="|")            
csv_reason.write.format("org.apache.spark.sql.cassandra").options(table="reason",keyspace="tpcds").mode("append").save()  
print("schema_item")


schema_item= StructType([StructField("i_item_sk",IntegerType(),True),StructField("i_item_id",StringType(),True),StructField("i_rec_start_date",StringType(),True),StructField("i_rec_end_date",StringType(),True),StructField("i_item_desc",StringType(),True),StructField("i_current_price",FloatType(),True),StructField("i_wholesale_cost",FloatType(),True),StructField("i_brand_id",IntegerType(),True),StructField("i_brand",StringType(),True),StructField("i_class_id",IntegerType(),True),StructField("i_class",StringType(),True),StructField("i_category_id",IntegerType(),True),StructField("i_category",StringType(),True),StructField("i_manufact_id",IntegerType(),True),StructField("i_manufact",StringType(),True),StructField("i_size",StringType(),True),StructField("i_formulation",StringType(),True),StructField("i_color",StringType(),True),StructField("i_units",StringType(),True),StructField("i_container",StringType(),True),StructField("i_manager_id",IntegerType(),True),StructField("i_product_name",StringType(),True),StructField("junk",StringType(),True),])            
csv_item = spark.read.format("csv").schema(schema_item).load("./work/data/item*.dat", delimiter="|")            
csv_item.write.format("org.apache.spark.sql.cassandra").options(table="item",keyspace="tpcds").mode("append").save()

print("schema_date_dim")

schema_date_dim= StructType([StructField("d_date_sk",IntegerType(),True),StructField("d_date_id",StringType(),True),StructField("d_date",StringType(),True),StructField("d_month_seq",IntegerType(),True),StructField("d_week_seq",IntegerType(),True),StructField("d_quarter_seq",IntegerType(),True),StructField("d_year",IntegerType(),True),StructField("d_dow",IntegerType(),True),StructField("d_moy",IntegerType(),True),StructField("d_dom",IntegerType(),True),StructField("d_qoy",IntegerType(),True),StructField("d_fy_year",IntegerType(),True),StructField("d_fy_quarter_seq",IntegerType(),True),StructField("d_fy_week_seq",IntegerType(),True),StructField("d_day_name",StringType(),True),StructField("d_quarter_name",StringType(),True),StructField("d_holiday",StringType(),True),StructField("d_weekend",StringType(),True),StructField("d_following_holiday",StringType(),True),StructField("d_first_dom",IntegerType(),True),StructField("d_last_dom",IntegerType(),True),StructField("d_same_day_ly",IntegerType(),True),StructField("d_same_day_lq",IntegerType(),True),StructField("d_current_day",StringType(),True),StructField("d_current_week",StringType(),True),StructField("d_current_month",StringType(),True),StructField("d_current_quarter",StringType(),True),StructField("d_current_year",StringType(),True),StructField("junk",StringType(),True),])            
csv_date_dim = spark.read.format("csv").schema(schema_date_dim).load("./work/data/date_dim*.dat", delimiter="|")            
csv_date_dim.write.format("org.apache.spark.sql.cassandra").options(table="date_dim",keyspace="tpcds").mode("append").save() 

print("schema_household_demographics")

schema_household_demographics= StructType([StructField("hd_demo_sk",IntegerType(),True),StructField("hd_income_band_sk",IntegerType(),True),StructField("hd_buy_potential",StringType(),True),StructField("hd_dep_count",IntegerType(),True),StructField("hd_vehicle_count",IntegerType(),True),StructField("junk",StringType(),True),])            
csv_household_demographics = spark.read.format("csv").schema(schema_household_demographics).load("./work/data/household_demographics*.dat", delimiter="|")            
csv_household_demographics.write.format("org.apache.spark.sql.cassandra").options(table="household_demographics",keyspace="tpcds").mode("append").save()  

print("schema_income_band")

schema_income_band= StructType([StructField("ib_income_band_sk",IntegerType(),True),StructField("ib_lower_bound",IntegerType(),True),StructField("ib_upper_bound",IntegerType(),True),StructField("junk",StringType(),True),])            
csv_income_band = spark.read.format("csv").schema(schema_income_band).load("./work/data/income_band*.dat", delimiter="|")            
csv_income_band.write.format("org.apache.spark.sql.cassandra").options(table="income_band",keyspace="tpcds").mode("append").save() 


## REAL TIME TABLE TO BE USED WITH NOSQLBENCH
# schema_web_sales = StructType([StructField("ws_sold_date_sk",IntegerType(),True),StructField("ws_sold_time_sk",IntegerType(),True),StructField("ws_ship_date_sk",IntegerType(),True),StructField("ws_item_sk",IntegerType(),True),StructField("ws_bill_customer_sk",IntegerType(),True),StructField("ws_bill_cdemo_sk",IntegerType(),True),StructField("ws_bill_hdemo_sk",IntegerType(),True),StructField("ws_bill_addr_sk",IntegerType(),True),StructField("ws_ship_customer_sk",IntegerType(),True),StructField("ws_ship_cdemo_sk",IntegerType(),True),StructField("ws_ship_hdemo_sk",IntegerType(),True),StructField("ws_ship_addr_sk",IntegerType(),True),StructField("ws_web_page_sk",IntegerType(),True),StructField("ws_web_site_sk",IntegerType(),True),StructField("ws_ship_mode_sk",IntegerType(),True),StructField("ws_warehouse_sk",IntegerType(),True),StructField("ws_promo_sk",IntegerType(),True),StructField("ws_order_number",IntegerType(),True),StructField("ws_quantity",IntegerType(),True),StructField("ws_wholesale_cost",FloatType(),True),StructField("ws_list_price",FloatType(),True),StructField("ws_sales_price",FloatType(),True),StructField("ws_ext_discount_amt",FloatType(),True),StructField("ws_ext_sales_price",FloatType(),True),StructField("ws_ext_wholesale_cost",FloatType(),True),StructField("ws_ext_list_price",FloatType(),True),StructField("ws_ext_tax",FloatType(),True),StructField("ws_coupon_amt",FloatType(),True),StructField("ws_ext_ship_cost",FloatType(),True),StructField("ws_net_paid",FloatType(),True),StructField("ws_net_paid_inc_tax",FloatType(),True),StructField("ws_net_paid_inc_ship",FloatType(),True),StructField("ws_net_paid_inc_ship_tax",FloatType(),True),StructField("ws_net_profit",FloatType(),True),StructField("junk",StringType(),True),])
# csv_web_sales = spark.read.format("csv").schema(schema_web_sales).load("/home/ricardo/tpcds-kit/./work/data/out/web_sales*.dat", delimiter="|")
# csv_web_sales.write.format("org.apache.spark.sql.cassandra").options(table="web_sales",keyspace="tpcds").mode("append").save()

# print("catalog_sales")
# catalog_sales_schema = StructType([ StructField("cs_sold_date_sk",IntegerType(),True), StructField("cs_sold_time_sk",IntegerType(),True), StructField("cs_ship_date_sk",IntegerType(),True), StructField("cs_bill_customer_sk",IntegerType(),True), StructField("cs_bill_cdemo_sk",IntegerType(),True), StructField("cs_bill_hdemo_sk",IntegerType(),True), StructField("cs_bill_addr_sk",IntegerType(),True), StructField("cs_ship_customer_sk",IntegerType(),True), StructField("cs_ship_cdemo_sk",IntegerType(),True), StructField("cs_ship_hdemo_sk",IntegerType(),True), StructField("cs_ship_addr_sk",IntegerType(),True), StructField("cs_call_center_sk",IntegerType(),True), StructField("cs_catalog_page_sk",IntegerType(),True), StructField("cs_ship_mode_sk",IntegerType(),True), StructField("cs_warehouse_sk",IntegerType(),True), StructField("cs_item_sk",IntegerType(),True), StructField("cs_promo_sk",IntegerType(),True), StructField("cs_order_number",IntegerType(),True), StructField("cs_quantity",IntegerType(),True), StructField("cs_wholesale_cost",FloatType(),True), StructField("cs_list_price",FloatType(),True), StructField("cs_sales_price",FloatType(),True), StructField("cs_ext_discount_amt",FloatType(),True), StructField("cs_ext_sales_price",FloatType(),True), StructField("cs_ext_wholesale_cost",FloatType(),True), StructField("cs_ext_list_price",FloatType(),True), StructField("cs_ext_tax",FloatType(),True), StructField("cs_coupon_amt",FloatType(),True), StructField("cs_ext_ship_cost",FloatType(),True), StructField("cs_net_paid",FloatType(),True), StructField("cs_net_paid_inc_tax",FloatType(),True), StructField("cs_net_paid_inc_ship",FloatType(),True), StructField("cs_net_paid_inc_ship_tax",FloatType(),True), StructField("cs_net_profit",FloatType(),True), StructField("junk",StringType(),True), ])
# csv_catalog_sales = spark.read.format("csv").schema(catalog_sales_schema).load("./work/data/catalog_sales*.dat", delimiter="|")
# csv_catalog_sales.write.format("org.apache.spark.sql.cassandra").options(table="catalog_sales",keyspace="tpcds").mode("append").save()


# print("schema_store_returns")
# schema_store_returns = StructType([ StructField("sr_returned_date_sk",IntegerType(),True), StructField("sr_return_time_sk",IntegerType(),True), StructField("sr_item_sk",IntegerType(),True), StructField("sr_customer_sk",IntegerType(),True), StructField("sr_cdemo_sk",IntegerType(),True), StructField("sr_hdemo_sk",IntegerType(),True), StructField("sr_addr_sk",IntegerType(),True), StructField("sr_store_sk",IntegerType(),True), StructField("sr_reason_sk",IntegerType(),True), StructField("sr_ticket_number",IntegerType(),True), StructField("sr_return_quantity",IntegerType(),True), StructField("sr_return_amt",FloatType(),True), StructField("sr_return_tax",FloatType(),True), StructField("sr_return_amt_inc_tax",FloatType(),True), StructField("sr_fee",FloatType(),True), StructField("sr_return_ship_cost",FloatType(),True), StructField("sr_refunded_cash",FloatType(),True), StructField("sr_reversed_charge",FloatType(),True), StructField("sr_store_credit",FloatType(),True), StructField("sr_net_loss",FloatType(),True), StructField("junk",StringType(),True)])
# csv_store_returns = spark.read.format("csv").schema(schema_store_returns).load("./work/data/store_returns*.dat", delimiter="|")
# csv_store_returns.write .format("org.apache.spark.sql.cassandra") .options(table="store_returns",keyspace="tpcds") .mode("append").save()   


# print("schema_web_returns")
# schema_web_returns = StructType([StructField("wr_returned_date_sk",IntegerType(),True),StructField("wr_returned_time_sk",IntegerType(),True),StructField("wr_item_sk",IntegerType(),True),StructField("wr_refunded_customer_sk",IntegerType(),True),StructField("wr_refunded_cdemo_sk",IntegerType(),True),StructField("wr_refunded_hdemo_sk",IntegerType(),True),StructField("wr_refunded_addr_sk",IntegerType(),True),StructField("wr_returning_customer_sk",IntegerType(),True),StructField("wr_returning_cdemo_sk",IntegerType(),True),StructField("wr_returning_hdemo_sk",IntegerType(),True),StructField("wr_returning_addr_sk",IntegerType(),True),StructField("wr_web_page_sk",IntegerType(),True),StructField("wr_reason_sk",IntegerType(),True),StructField("wr_order_number",IntegerType(),True),StructField("wr_return_quantity",IntegerType(),True),StructField("wr_return_amt",FloatType(),True),StructField("wr_return_tax",FloatType(),True),StructField("wr_return_amt_inc_tax",FloatType(),True),StructField("wr_fee",FloatType(),True),StructField("wr_return_ship_cost",FloatType(),True),StructField("wr_refunded_cash",FloatType(),True),StructField("wr_reversed_charge",FloatType(),True),StructField("wr_account_credit",FloatType(),True),StructField("wr_net_loss",FloatType(),True),StructField("junk",StringType(),True),])
# csv_web_returns = spark.read.format("csv").schema(schema_web_returns).load("/home/ricardo/tpcds-kit/./work/data/out/web_returns*.dat", delimiter="|")
# csv_web_returns.write .format("org.apache.spark.sql.cassandra") .options(table="web_returns",keyspace="tpcds") .mode("append").save()

# print("store_sales")
# schema_store_sales = StructType([ StructField("ss_sold_date_sk",IntegerType(),True), StructField("ss_sold_time_sk",IntegerType(),True), StructField("ss_item_sk",IntegerType(),True), StructField("ss_customer_sk",IntegerType(),True), StructField("ss_cdemo_sk",IntegerType(),True), StructField("ss_hdemo_sk",IntegerType(),True), StructField("ss_addr_sk",IntegerType(),True), StructField("ss_store_sk",IntegerType(),True), StructField("ss_promo_sk",IntegerType(),True), StructField("ss_ticket_number",IntegerType(),True), StructField("ss_quantity",IntegerType(),True), StructField("ss_wholesale_cost",FloatType(),True), StructField("ss_list_price",FloatType(),True), StructField("ss_sales_price",FloatType(),True), StructField("ss_ext_discount_amt",FloatType(),True), StructField("ss_ext_sales_price",FloatType(),True), StructField("ss_ext_wholesale_cost",FloatType(),True), StructField("ss_ext_list_price",FloatType(),True), StructField("ss_ext_tax",FloatType(),True), StructField("ss_coupon_amt",FloatType(),True), StructField("ss_net_paid",FloatType(),True), StructField("ss_net_paid_inc_tax",FloatType(),True), StructField("ss_net_profit",FloatType(),True), StructField("junk",StringType(),True),  ])
# csv_store_sales = spark.read.format("csv").schema(schema_store_sales).load("./work/data/store_sales*.dat", delimiter="|")
# csv_store_sales.write .format("org.apache.spark.sql.cassandra") .options(table="store_sales",keyspace="tpcds") .mode("append").save() 





## Generate Real-time Workload with NoSQLBench

#### On your Terminal start the workload:


./nb run driver=cql workload=/home/jovyan/work/workload-nosqlbench.yaml tags=phase:schema threads=auto cycles=11

## Running Analytical SQL Queries with SparkSQL and Scylla

In [None]:
from pyspark import SparkContext,SQLContext,SparkConf,StorageLevel
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
IPS = ['172.19.0.2']
spark = SparkSession\
    .builder\
    .appName("IoT-Scylla")\
    .config("setMaster","local[*]")\
    .config("spark.jars", "target/scala-2.12/spark3-scylla4-example-assembly-0.1.jar")\
    .config("spark.cassandra.connection.host", ','.join(IPS))\
    .config('spark.cassandra.concurrent.reads','20480')\
    .config('spark.cassandra.input.consistency.level','LOCAL_ONE')\
    .config('spark.cassandra.input.fetch.sizeInRows','2000')\
    .config('spark.cassandra.input.split.sizeInMB','512')\
    .config("spark.driver.memory", "5g")\
    .config("spark.executor.memory", "5g")\
    .config("spark.driver.cores",5)\
    .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(spark)

sc._conf.get('spark.executor.memory')
sc._conf.get('spark.driver.memory')


### Registering Tables

In [None]:
call_center = spark.read.format("org.apache.spark.sql.cassandra").options(table="call_center", keyspace="tpcds").load()
call_center.registerTempTable("call_center")
catalog_page = spark.read.format("org.apache.spark.sql.cassandra").options(table="catalog_page", keyspace="tpcds").load()
catalog_page.registerTempTable("catalog_page")
catalog_returns = spark.read.format("org.apache.spark.sql.cassandra").options(table="catalog_returns", keyspace="tpcds").load()
catalog_returns.registerTempTable("catalog_returns")
catalog_sales = spark.read.format("org.apache.spark.sql.cassandra").options(table="catalog_sales", keyspace="tpcds").load()
catalog_sales.registerTempTable("catalog_sales")
customer = spark.read.format("org.apache.spark.sql.cassandra").options(table="customer", keyspace="tpcds").load()
customer.registerTempTable("customer")
customer_address = spark.read.format("org.apache.spark.sql.cassandra").options(table="customer_address", keyspace="tpcds").load()
customer_address.registerTempTable("customer_address")
customer_demographics = spark.read.format("org.apache.spark.sql.cassandra").options(table="customer_demographics", keyspace="tpcds").load()
customer_demographics.registerTempTable("customer_demographics")
date_dim = spark.read.format("org.apache.spark.sql.cassandra").options(table="date_dim", keyspace="tpcds").load()
date_dim.registerTempTable("date_dim")
household_demographics = spark.read.format("org.apache.spark.sql.cassandra").options(table="household_demographics", keyspace="tpcds").load()
household_demographics.registerTempTable("household_demographics")
income_band = spark.read.format("org.apache.spark.sql.cassandra").options(table="income_band", keyspace="tpcds").load()
income_band.registerTempTable("income_band")
inventory = spark.read.format("org.apache.spark.sql.cassandra").options(table="inventory", keyspace="tpcds").load()
inventory.registerTempTable("inventory")
item = spark.read.format("org.apache.spark.sql.cassandra").options(table="item", keyspace="tpcds").load()
item.registerTempTable("item")
promotion = spark.read.format("org.apache.spark.sql.cassandra").options(table="promotion", keyspace="tpcds").load()
promotion.registerTempTable("promotion")
reason = spark.read.format("org.apache.spark.sql.cassandra").options(table="reason", keyspace="tpcds").load()
reason.registerTempTable("reason")
ship_mode = spark.read.format("org.apache.spark.sql.cassandra").options(table="ship_mode", keyspace="tpcds").load()
ship_mode.registerTempTable("ship_mode")
store = spark.read.format("org.apache.spark.sql.cassandra").options(table="store", keyspace="tpcds").load()
store.registerTempTable("store")
store_returns = spark.read.format("org.apache.spark.sql.cassandra").options(table="store_returns", keyspace="tpcds").load()
store_returns.registerTempTable("store_returns")
store_sales = spark.read.format("org.apache.spark.sql.cassandra").options(table="store_sales", keyspace="tpcds").load()
store_sales.registerTempTable("store_sales")
time_dim = spark.read.format("org.apache.spark.sql.cassandra").options(table="time_dim", keyspace="tpcds").load()
time_dim.registerTempTable("time_dim")
warehouse = spark.read.format("org.apache.spark.sql.cassandra").options(table="warehouse", keyspace="tpcds").load()
warehouse.registerTempTable("warehouse")
web_page = spark.read.format("org.apache.spark.sql.cassandra").options(table="web_page", keyspace="tpcds").load()
web_page.registerTempTable("web_page")
web_returns = spark.read.format("org.apache.spark.sql.cassandra").options(table="web_returns", keyspace="tpcds").load()
web_returns.registerTempTable("web_returns")
web_sales = spark.read.format("org.apache.spark.sql.cassandra").options(table="web_sales", keyspace="tpcds").load()
web_sales.registerTempTable("web_sales")
web_site = spark.read.format("org.apache.spark.sql.cassandra").options(table="web_site", keyspace="tpcds").load()
web_site.registerTempTable("web_site")

### Running SQL Queries

In [None]:
%%time
query = '''select d_date from web_sales join date_dim on ws_sold_date_sk = d_date_sk
                              where d_date between \'2021-06-20' and \'2021-06-21\''''
sqlContext.sql(query).show()


In [None]:
%%time
query1= '''select 
    i_item_desc,
  i_category,
  i_class,
  i_current_price,
  i_item_id,
  sum(ws_ext_sales_price) as itemrevenue
from
  web_sales
  join item on (web_sales.ws_item_sk = item.i_item_sk)
where
  i_category in('Jewelry', 'Sports', 'Books')
  and ws_sold_date_sk = 2459247
group by
  i_item_id,
  i_item_desc,
  i_category,
  i_class,
  i_current_price
order by
  i_category,
  i_class,
  i_item_id,
  i_item_desc
  -- revenueratio
limit 1000;'''

sqlContext.sql(query1).show(100)
