In [None]:
import sys, csv
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import datediff, col, abs
from pyspark import SparkConf
import requests
import logging

In [None]:
# Parameter that will be used through Papermill when executed via Airflow. If the notebook is run via Airflow, some steps will not be performed as they are not necessary.
origin='jupyter'
logging.info('>>>>>>>>>>>>>>>>> {}'.format(origin))

In [3]:
# change ip for spark cluster
spark = SparkSession.builder.master("spark://172.20.0.9:7077") \
.appName("extract_and_load") \
.config("spark.cassandra.connection.host", "172.20.0.11") \
.config("spark.cassandra.connection.port", "9042") \
.config("spark.cassandra.auth.username", "cassandra") \
.config("spark.cassandra.auth.password", "cassandra") \
.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.1.0") \
.getOrCreate()

#.config("spark.cassandra.output.ifNotExists", "true") \
#.config("spark.jars", "/opt/bitnami/spark/jars/spark-cassandra-connector-2.4.0-s_2.11.jar") \
#.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.1.0") \
#.config("spark.jars.packages","com.datastax.spark:spark-cassandra-connector_2.12:3.2.0-beta,com.datastax.cassandra:cassandra-driver-core:3.11 spark-cassandra-connector-assembly-1.1.1-SNAPSHOT.jar")


In [4]:
def load_and_get_table_df(keys_space_name, table_name):
    table_df = spark.read\
        .format("org.apache.spark.sql.cassandra")\
        .options(table=table_name, keyspace=keys_space_name)\
        .load()
    return table_df

### Dependency
Necessary to run the Airflow Dag(cassandra_create_test_database_dag) to create the objects(keyspace and tables) in Cassandra

In [5]:
if origin == 'jupyter':
    # Trigger Dag cassandra_create_test_database_dag
    url = "http://172.20.0.4:8080/api/v1/dags/cassandra_create_test_database_dag/dagRuns"

    payload="{\n    \"conf\": {\"dag_run_id\":\"cassandra_create_test_database_dag_via_jupyter\"}\n}"
    headers = {
      'Authorization': 'Basic YWlyZmxvdzphaXJmbG93',
      'Content-Type': 'application/json'
    }

    response = requests.request("POST", url, headers=headers, data=payload)

    print(response.text)

{
  "conf": {
    "dag_run_id": "cassandra_create_test_database_dag_via_jupyter"
  },
  "dag_id": "cassandra_create_test_database_dag",
  "dag_run_id": "manual__2022-02-19T16:43:55.131071+00:00",
  "end_date": null,
  "execution_date": "2022-02-19T16:43:55.131071+00:00",
  "external_trigger": true,
  "logical_date": "2022-02-19T16:43:55.131071+00:00",
  "start_date": null,
  "state": "queued"
}



In [6]:
if origin == 'jupyter': 
    # Check status Dag. Dag will need to be executed and finish
    url = "http://172.20.0.4:8080/api/v1/dags/cassandra_create_test_database_dag/dagRuns"

    headers = {
      'Authorization': 'Basic YWlyZmxvdzphaXJmbG93'
    }

    response = requests.request("GET", url, headers=headers, data=payload)

    print(response.text)

{
  "dag_runs": [
    {
      "conf": {},
      "dag_id": "cassandra_create_test_database_dag",
      "dag_run_id": "scheduled__2022-02-19T00:00:00+00:00",
      "end_date": "2022-02-19T15:15:49.990762+00:00",
      "execution_date": "2022-02-19T00:00:00+00:00",
      "external_trigger": false,
      "logical_date": "2022-02-19T00:00:00+00:00",
      "start_date": "2022-02-19T15:15:16.764480+00:00",
      "state": "success"
    },
    {
      "conf": {
        "dag_run_id": "cassandra_create_test_database_dag_via_jupyter"
      },
      "dag_id": "cassandra_create_test_database_dag",
      "dag_run_id": "manual__2022-02-19T15:33:50.637994+00:00",
      "end_date": "2022-02-19T15:34:24.675894+00:00",
      "execution_date": "2022-02-19T15:33:50.637994+00:00",
      "external_trigger": true,
      "logical_date": "2022-02-19T15:33:50.637994+00:00",
      "start_date": "2022-02-19T15:33:51.620533+00:00",
      "state": "success"
    },
    {
      "conf": {
        "dag_run_id": "cassandr

#### Write dataframe in cassandra

In [7]:
csv_df = spark.read.format("csv").option("header", "true").load("/usr/local/spark/data/previous_employees_by_job_title.txt")

In [8]:
write_df = csv_df.select("job_title", "employee_id", "employee_name", "first_day", "last_day")

In [9]:
write_df.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="previous_employees_by_job_title", keyspace='test')\
    .save()

#### Consume cassandra to process and import

In [10]:
load_and_get_table_df("test", "previous_employees_by_job_title").show()

+-----------+--------------------+-----------------+-------------------+-------------------+
|  job_title|         employee_id|    employee_name|          first_day|           last_day|
+-----------+--------------------+-----------------+-------------------+-------------------+
|Electrician|001b08a1-8209-4ea...|     Elijah Young|2019-01-10 21:10:49|2000-01-21 11:06:55|
|Electrician|003d2f1f-0dfb-449...|     Tom Flanders|2017-01-09 06:33:15|2009-06-17 04:51:06|
|Electrician|0071613d-03e3-427...| Chester Richards|2005-11-26 00:04:24|2002-03-25 21:38:39|
|Electrician|00866b90-454f-45b...|    Logan Bayliss|2005-11-26 16:39:33|2001-04-11 04:51:08|
|Electrician|00c257d8-be9a-440...|   Marvin Dickson|2010-07-24 09:43:13|2015-01-08 05:51:51|
|Electrician|00dc0c37-51f4-47c...|       Fred Addis|2005-05-14 16:27:50|2005-03-31 02:33:53|
|Electrician|00e57955-b31c-48f...|      Doug Notman|2013-05-05 21:36:22|2019-10-16 06:26:50|
|Electrician|00e8bf42-1efa-48f...|   Angelique Khan|2002-01-09 08:09:0

In [11]:
# configure database catalog
spark.conf.set(f"spark.sql.catalog.cassandra", "com.datastax.spark.connector.datasource.CassandraCatalog")
spark.sql("use cassandra.test")

DataFrame[]

In [12]:
calcDF = spark.sql("select job_title, employee_id, employee_name, abs(datediff(last_day, first_day)) as days_worked from previous_employees_by_job_title")

In [13]:
calcDF.show()

+---------------+--------------------+----------------+-----------+
|      job_title|         employee_id|   employee_name|days_worked|
+---------------+--------------------+----------------+-----------+
|Project Manager|004eb7c3-dd89-448...| Camila Saunders|       3042|
|Project Manager|005e3aaa-0b5b-4f4...| Domenic Bennett|       1450|
|Project Manager|00842a72-b91a-4cf...|    Hayden Boden|       1896|
|Project Manager|00a24ee1-8872-4d1...| Rachael Pearson|         75|
|Project Manager|00d8069f-6b31-4c1...|     Johnny Wild|        531|
|Project Manager|011a1771-060f-44c...|    Zara Shields|       1371|
|Project Manager|01684fcd-f649-439...|    Adalind Moss|       4042|
|Project Manager|017488fd-2913-493...|     Rick Wright|       2875|
|Project Manager|01a0c7fd-9b91-4fa...|Bethany Harrison|       1686|
|Project Manager|01d3c729-2318-450...|    Bree Thomson|       1747|
|Project Manager|01dcf3f1-daa1-420...|  Tiffany Morris|        657|
|Project Manager|023d10ff-5d8d-43a...|   Melanie

In [14]:
calcDF.write\
    .format("org.apache.spark.sql.cassandra")\
    .mode('append')\
    .options(table="days_worked_by_previous_employees_by_job_title", keyspace='test')\
    .save()

In [15]:
spark.stop()