# Example: Spark + DSE

In BGDS, we advocate for the convination of the following tools as the best solution to address Big data's problems.

- Apache Spark
- Apache Cassandra (DSE)
- Apache Solr (DSE)

This notebook is an example of how to use these tools together to analyze the data crawled by the `Bovespa` example crawler included in the DaVinci distribution ().

In order to be able to execute this notebook you will need first to start the `Bovespa` crawler and crawl all the data.

```bash
python manage.py crawl bovespa \
    --workers-num 10 \
    --chromium-bin-file '/Applications/Chromium.app/Contents/MacOS/Chromium' \
    --io-gs-project centering-badge-212119 \
    --cache-dir "gs://davinci_example_bovespa" \
    --local-dir "fs:///data/bovespa/local"
```

In [1]:
from IPython.display import display, HTML

In [2]:
# Configurations related to Cassandra connector & Cluster
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages datastax:spark-cassandra-connector:2.4.0-s_2.11 pyspark-shell'

In [3]:
# Creating Spark Context
from pyspark import SparkContext, SparkConf

conf = SparkConf() \
 .setAppName("Factor Analysis Job") \
 .set("spark.cassandra.connection.host", "dse_seed_backend") \
 .set("spark.sql.dse.search.enableOptimization", "on") 
# .set("spark.cassandra.auth.username", "cassandra") \
# .set("spark.cassandra.auth.password", "sQQE87Nt")  

# spark_master = "spark://127.0.0.1:7077"
spark_master = "local"
sc = SparkContext(spark_master, "Factor Analysis Job", conf=conf)

In [4]:
# Creating PySpark SQL Context
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [5]:
# Utility function for return a data frame attached to the informed keyspace.table
def load_and_get_table_df(keys_space_name, table_name):
    table_df = sqlContext.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(table=table_name, keyspace=keys_space_name) \
        .option("spark.sql.dse.search.enableOptimization", "on") \
        .load()
    return table_df

In [6]:
from pyspark.sql.functions import udf, col

# Loading Bovespa companies table
companies = load_and_get_table_df("davinci", "bovespa_company")
companies.filter(~col("situation").eqNullSafe("CANCELED")).show(10)

IllegalArgumentException: 'Cannot build a cluster without contact points'

In [None]:
# Check the execution plan of the query
companies.filter(~col("situation").eqNullSafe("CANCELED")).explain(True)

In [None]:
# Accessing to the fundamental data of the companies
# Attaching a data frame to the accounting notes available in the database
fundamentals = load_and_get_table_df("davinci", "bovespa_account")
fundamentals.show(3)

In [None]:
from pyspark.sql.functions import *
from pyspark.sql import Window

#factors_df = fundamentals.filter(
#    col("number").isin(["6.02","1"]) & col("ccvm").isin(["13773", "22306", "1023", "10472"]))

# Get the Capital Expenditures from the Fundamentals
# Account 1    = Total Assets of the company
# Account 6.02 = Capital Expenditure reported by the company during the period
factors_df = fundamentals.filter(
    col("number").isin(["6.02", "1"]))
factors_df = factors_df.withColumn(
    "factor_name", when(factors_df.number == "1", "total_assets").otherwise("cap_ex_reported"))

factors_df = factors_df\
    .select(col("ccvm").alias("asset"), 
            col("period").alias("astodate"),
            col("factor_name"), 
            col("amount").alias("amount"))

factors_df = factors_df.groupby(col("asset"), col("astodate"))\
    .pivot("factor_name").sum("amount").orderBy("asset", "astodate")

factors_df = factors_df.withColumn(
    "cap_ex_reported_scaled", 
    when(factors_df.cap_ex_reported > 0, 0).otherwise(abs(factors_df.cap_ex_reported)) / factors_df.total_assets)

factors_df = factors_df.withColumn("capex_vol_6q", stddev(col("cap_ex_reported_scaled"))
             .over(Window.partitionBy("asset").rowsBetween(-5, 0)) )

factors_df = factors_df.withColumn("capex_vol_6q_ranked", rank()\
             .over(Window.partitionBy("astodate").orderBy(asc("capex_vol_6q"))))

In [None]:
factors_df.select(col("astodate"), col("asset"), col("capex_vol_6q"), col("capex_vol_6q_ranked")).show(100)

In [None]:
factors_df.cache()

In [None]:
def z_score_w(col, w):
    avg_ = avg(col).over(w)
    avg_sq = avg(col * col).over(w)
    sd_ = sqrt(avg_sq - avg_ * avg_)
    return (col - avg_) / sd_

w = Window().partitionBy("astodate")
factors_df_standard = factors_df.withColumn("capex_vol_6q_ranked_zscored", 
                                            z_score_w(factors_df.capex_vol_6q_ranked, w))

In [None]:
factors_df_standard.select(
    col("astodate"), 
    col("asset"), 
    col("capex_vol_6q_ranked_zscored")).show(100)

In [None]:
factors_data = factors_df_standard.select(
    col("astodate"), 
    col("asset"), 
    col("capex_vol_6q_ranked_zscored").alias("capex_vol_6q")).limit(10).toPandas()

# Persist the results

Now it's time to save the factor into a table. To do this, the system should had create a 
keyspace for us in the database. The keyspace should have the same TOKEN than the one used
to connect through the API.

Each user will have it's own space in the system to run the analyses.

```sql
CREATE KEYSPACE IF NOT EXISTS <USER_ID>_analysis WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};
```

with full access to it:

```sql
-- Create the user using the token and
CREATE ROLE user_<USER_ID> WITH PASSWORD = '<USER_TOKEN>' AND LOGIN = true AND SUPERUSER = false;

-- Grant all permission for the user keyspace
GRANT ALL PERMISSIONS IN KEYSPACE <USER_ID>_analysis to user_<USER_ID>;
```

PRO features:
- Allow replication of data (ReplicationFactor = 3)
    ```sql
    CREATE KEYSPACE IF NOT EXISTS <TOKEN>_analysis WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};
    ```
- Network replication (multiple DataCenters)
    ```sql
    CREATE KEYSPACE IF NOT EXISTS <TOKEN>_analysis WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1' : 2, 'DC2': 3, 'DC3': 2};
    ```

In [None]:
try:
    from dse.cluster import Cluster
    from dse.auth import DSEPlainTextAuthProvider
except ImportError:
    from cassandra.cluster import Cluster
    from cassandra.auth import DSEPlainTextAuthProvider

# auth_provider = DSEPlainTextAuthProvider('cassandra', 'xxxx')

#cluster = Cluster(['10.154.0.6', '10.154.0.3'], auth_provider=auth_provider)  # provide contact points and port
cluster = Cluster(['127.0.0.1'])  # provide contact points and port
session = cluster.connect('xalperte_analysis')

In [None]:
try:
    from dse.cqlengine.models import Model
    from dse.cqlengine import columns
    from dse.cqlengine.management import sync_table
    from dse.cqlengine.connection import register_connection
except ImportError:
    from cassandra.cqlengine.models import Model
    from cassandra.cqlengine import columns
    from cassandra.cqlengine.management import sync_table
    from cassandra.cqlengine.connection import register_connection
    
from pyspark.sql.types import StringType, DateType, TimestampType
from pyspark.sql.types import FloatType, DecimalType, DoubleType
from pyspark.sql.types import IntegerType

def create_cassandra_model(schema, table_name, primary_colums=None):
    class MyModel(object):
        pass

    if not isinstance(primary_colums, list):
        primary_colums = [primary_colums]
        
    df_fields = {}
    for field in schema.fields:
        df_fields[field.name] = field
        
    metadata = {
        "__table_name__": table_name
    }
        
    # First the primary keys
    for field_name in primary_colums:
        field = df_fields[field_name]
        if isinstance(field.dataType, StringType):
            metadata[field_name] = columns.Text(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, DateType):
            metadata[field_name] = columns.Date(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, TimestampType):
            metadata[field_name] = columns.DateTime(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, DoubleType):
            metadata[field_name] = columns.Double(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, DecimalType):
            metadata[field_name] = columns.Decimal(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, FloatType):
            metadata[field_name] = columns.Float(required=(not field.nullable), primary_key=True)
        if isinstance(field.dataType, IntegerType):
            metadata[field_name] = columns.Integer(required=(not field.nullable), primary_key=True)

    # First the primary keys
    for field_name, field in df_fields.items():
        if field_name not in primary_colums:
            if isinstance(field.dataType, StringType):
                metadata[field_name] = columns.Text(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, DateType):
                metadata[field_name] = columns.Date(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, TimestampType):
                metadata[field_name] = columns.DateTime(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, DoubleType):
                metadata[field_name] = columns.Double(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, DecimalType):
                metadata[field_name] = columns.Decimal(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, FloatType):
                metadata[field_name] = columns.Float(required=(not field.nullable), primary_key=False)
            if isinstance(field.dataType, IntegerType):
                metadata[field_name] = columns.Integer(required=(not field.nullable), primary_key=False)

    return type('MyModel', (Model,), metadata)
            
            
model = create_cassandra_model(factors_df_standard.schema, "capex_vol_6q", "asset")
register_connection("my_connection", session=session)
sync_table(model, keyspaces=["xalperte_analysis"], connections=["my_connection"])

In [None]:
factors_df.write\
    .format("org.apache.spark.sql.cassandra")\
    .options(table="capex_vol_6q", keyspace="xalperte_analysis")\
    .option("confirm.truncate","true")\
    .mode("overwrite")\
    .partitionBy("astodate")\
    .save()

In [None]:
factors_data.set_index(['astodate', 'asset'], inplace=True)
display(factors_data)

In [None]:
import pandas as pd

unixt_factors_data = factors_data.set_index(pd.MultiIndex.from_tuples(
    [(x.timestamp(), y) for x, y in factors_data.index.values],
    names=['astodate', 'asset']))
display(unixt_factors_data)

In [None]:
sc.stop()