In [1]:
import os
import sys
import socket
import boto3
import time
from datetime import timedelta, datetime
from boto3.dynamodb.conditions import Key, Attr
import requests

import sys,uuid
from pyspark.sql import SparkSession, Row
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import explode


In [2]:
key = os.environ["MINIO_ACCESS_KEY"]
secret = os.environ["MINIO_SECRET_KEY"]
endpoint = "http://minio:9000"
print(endpoint)

http://minio:9000


In [3]:
spark = SparkSession.builder \
.master("local") \
.appName("nino") \
.config("spark.hadoop.fs.s3a.access.key", key) \
.config("spark.hadoop.fs.s3a.secret.key", secret) \
.config("spark.hadoop.fs.s3a.endpoint", endpoint) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0,org.apache.hadoop:hadoop-aws:3.2.0,com.amazonaws:aws-java-sdk-bundle:1.11.375") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config('spark.submit.deployMode', 'client') \
.config("spark.kubernetes.container.image", "spark:spark-docker") \
.config("spark.kubernetes.pyspark.pythonVersion", "3") \
.config("spark.kubernetes.authenticate.driver.serviceAccountName", "default") \
.config("spark.executor.instances", "1") \
.config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
.config("spark.kubernetes.executor.request.cores","0.5") \
.config("spark.kubernetes.executor.limit.cores","1") \
.config("jupyterService.jupyterPort_create_prop", "30888") \
.config("serviceAccount", "spark") \
.getOrCreate()

In [4]:
sc = spark.sparkContext

# Create delta lake tables

In [5]:
delta_path = "s3a://delta/"
lake_path = "s3a://lake/"
json_path = "s3a://lake/wallstreetbets/"
output_path = "s3a://lake/outputs/"
delta_table_name='wallstreetbets'
database = 'reddit'

# Create list of all JSON files from the last hour

In [6]:
s3 = boto3.resource(
            's3',
            aws_access_key_id=key,
            aws_secret_access_key=secret,
            endpoint_url=endpoint,
        )

files_last_hour = []
mybucket = s3.Bucket("lake")
# if blank prefix is given, return everything)
bucket_prefix="/wallstreetbets"
objs = mybucket.objects.filter(
    Prefix = bucket_prefix)
for file in objs:
    if ((file.last_modified).replace(tzinfo = None) > datetime.now() - timedelta(hours=1)):
        print(file.key)
        print(file.last_modified)
        files_last_hour.append(lake_path+'/'+file.key)

In [96]:
data = spark.read.json(files_last_hour)

In [97]:
children = data.select("data.*")
df = children.select(explode("children")).select("col.data.*")
df.printSchema()

root
 |-- all_awardings: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- award_sub_type: string (nullable = true)
 |    |    |-- award_type: string (nullable = true)
 |    |    |-- awardings_required_to_grant_benefits: long (nullable = true)
 |    |    |-- coin_price: long (nullable = true)
 |    |    |-- coin_reward: long (nullable = true)
 |    |    |-- count: long (nullable = true)
 |    |    |-- days_of_drip_extension: long (nullable = true)
 |    |    |-- days_of_premium: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- end_date: string (nullable = true)
 |    |    |-- giver_coin_reward: long (nullable = true)
 |    |    |-- icon_format: string (nullable = true)
 |    |    |-- icon_height: long (nullable = true)
 |    |    |-- icon_url: string (nullable = true)
 |    |    |-- icon_width: long (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- is_enabled: boolean (nullable = t

In [98]:
df.show()


+--------------------+-------------------+---------------+-----------+--------+--------------------+-----------------------------+----------------------+---------------------+------------------------+--------------------+-----------------------+-----------------+---------------+--------------------+--------------+--------+-------------+---------+--------+------------+--------+-------+------------------+------------+-------------+-------------+---------------+-------------+-------------------+-----+-------------+------+------------------+------+----------+------+----------------------+----------------+-------+-------------------+----------------------+------------------+-------+--------+-----+---------------------------+--------------------+--------------------+----------------------+------------------+---------------------+---------------+------+--------------------+----------+--------+-------------+----------------+-----------+---------+---------+------------+--------------+----------

In [99]:
df.write \
    .format("delta") \
    .mode('overwrite')  \
    .option("mergeSchema", True) \
    .save(delta_path)

In [100]:
spark.sql(
        "CREATE DATABASE IF NOT EXISTS {}".format(database)
    )


spark.sql(
        """
        CREATE TABLE IF NOT EXISTS {}.{}
        USING DELTA
        LOCATION "{}"
        """.format(
            database, delta_table_name, delta_path
        )
    )

DataFrame[]

# Query delta lake format

In [101]:
spark.sql("DESCRIBE FORMATTED delta.`{}`".format(delta_path)).show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|       all_awardings|array<struct<awar...|       |
| allow_live_comments|             boolean|       |
|     approved_at_utc|              string|       |
|         approved_by|              string|       |
|            archived|             boolean|       |
|              author|              string|       |
|author_flair_back...|              string|       |
|author_flair_css_...|              string|       |
|author_flair_rich...|array<struct<e:st...|       |
|author_flair_temp...|              string|       |
|   author_flair_text|              string|       |
|author_flair_text...|              string|       |
|   author_flair_type|              string|       |
|     author_fullname|              string|       |
|author_patreon_flair|             boolean|       |
|      author_premium|             boolean|       |
|           

# Reading data from the CSV

In [102]:
output_df=spark.sql("select distinct CURRENT_TIMESTAMP as timestamp, title, selftext, url from delta.`{}`".format(delta_path)).distinct()
output_df.show()
output_df.write.mode('overwrite').json(output_path)

+--------------------+--------------------+--------------------+--------------------+
|           timestamp|               title|            selftext|                 url|
+--------------------+--------------------+--------------------+--------------------+
|2021-06-08 18:21:...|Whether it be sni...|                    |https://i.redd.it...|
|2021-06-08 18:21:...|$WISH 250K YOLO. ...|                    |https://i.redd.it...|
|2021-06-08 18:21:...|$CLNE to the mooo...|                    |https://v.redd.it...|
|2021-06-08 18:21:...|$CLOV - Your Luck...|Dear all baboons ...|https://www.reddi...|
|2021-06-08 18:21:...|Because I’m a BBe...|                    |https://i.redd.it...|
|2021-06-08 18:21:...|$BB The Boomer Th...|It's no secret, w...|https://www.reddi...|
|2021-06-08 18:21:...|Cow Farts Go BRRR...|**OK APES IT'S BE...|https://www.reddi...|
|2021-06-08 18:21:...|Holy shit guys I ...|                    |https://i.redd.it...|
|2021-06-08 18:21:...|$WISH it to the M...|           

In [103]:
spark.sql("select distinct count(*) from delta.`{}`".format(delta_path)).show()

+--------+
|count(1)|
+--------+
|     270|
+--------+



In [104]:
output_df.count()

28

# Ingest to Druid

In [105]:
headers = {'Content-type': 'application/json'}

ingest_spec = {
  "type": "index_parallel",
  "spec": {
    "ioConfig": {
      "type": "index_parallel",
      "inputSource": {
        "type": "s3",
        "prefixes": [
          "s3://lake/outputs"
        ]
      },
      "inputFormat": {
        "type": "json"
      },
      "appendToExisting": "true"
    },
    "tuningConfig": {
      "type": "index_parallel",
      "partitionsSpec": {
        "type": "dynamic"
      }
    },
    "dataSchema": {
      "dataSource": "wallstreetbets",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": [
          "selftext",
          "title",
          "url"
        ]
      },
      "granularitySpec": {
        "queryGranularity": "none",
        "rollup": "false",
        "segmentGranularity": "hour"
      }
    }
  }
}

In [106]:
session = requests.Session()

router = 'http://router:8888'
api_url = 'druid/indexer/v1/task'

druid_task_api = (
            router
            + '/'
            + api_url
        )

r = session.post(
            druid_task_api, data=json.dumps(ingest_spec), headers=headers, verify=False
        )
r.status_code

200