In [5]:
from pyspark.sql.functions import *
#from pyspark.sql.functions import col

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Raiders of the lost kek

The dataset comes from an academic research project. It is semi-structured, that is it has a structure but it is also nested. The code below opens the raw file (already on S3) and shows the schema it found.

In [6]:
input_bucket = 's3://raiders-lost-kek-us-west-2'
input_path = '/pol_062016-112019_labeled/pol_062016-112019_labeled.ndjson'
df = spark.read.json(input_bucket + input_path)
df.printSchema()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- posts: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- archived: long (nullable = true)
 |    |    |-- archived_on: long (nullable = true)
 |    |    |-- bumplimit: long (nullable = true)
 |    |    |-- capcode: string (nullable = true)
 |    |    |-- closed: long (nullable = true)
 |    |    |-- com: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- country_name: string (nullable = true)
 |    |    |-- entities: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- entity_end: long (nullable = true)
 |    |    |    |    |-- entity_label: string (nullable = true)
 |    |    |    |    |-- entity_start: long (nullable = true)
 |    |    |    |    |-- entity_text: string (nullable = true)
 |    |    |-- entitites: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- ext: string (nullable = true)
 |    |    

Let's wrangle the data a bit by "remembering" the original post for each thread, and also having a measure of thread size available at each post level. Let's convert the array of posts into separate rows, one for each thread.

In [7]:
df2 = df.withColumn("ori_post", df.posts[0].no).withColumn("replies_top", df.posts[0].replies+1)
df3 = df2.select("ori_post",
                 "replies_top",
                 explode(df2.posts).alias("posts_exp")
                 )


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, here is a list of desirable columns from the data dictionary document, which I converted from PDF to XLS. The prefix 'posts_exp' is there to account for the field name above.

In [8]:
my_select = ['posts_exp.no',
 'posts_exp.resto',
 'posts_exp.sticky',
 'posts_exp.closed',
 'posts_exp.now',
 'posts_exp.time',
 'posts_exp.name',
 'posts_exp.trip',
 'posts_exp.id',
 'posts_exp.capcode',
 'posts_exp.country',
 'posts_exp.country_name',
 'posts_exp.sub',
 'posts_exp.com',
 'posts_exp.tim',
 'posts_exp.filename',
 'posts_exp.ext',
 'posts_exp.fsize',
 'posts_exp.md5',
 'posts_exp.w',
 'posts_exp.h',
 'posts_exp.tn_w',
 'posts_exp.tn_h',
 'posts_exp.filedeleted',
 'posts_exp.spoiler',
 'posts_exp.custom_spoiler',
 'posts_exp.replies',
 'posts_exp.images',
 'posts_exp.bumplimit',
 'posts_exp.imagelimit',
 'posts_exp.semantic_url',
 'posts_exp.since4pass',
 'posts_exp.unique_ips',
 'posts_exp.m_img',
 'posts_exp.archived',
 'posts_exp.archived_on',
 'posts_exp.extracted_poster_id',
 'posts_exp.troll_country']


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Next, let's make sure our final dataframe contains only valid columns.

In [9]:
my_new_select = []
for c in my_select:
    try:
        df3.select(c)
        my_new_select.append(c)
    except:
        print(f"{c} not found in schema.")
my_final_select = ["ori_post", "replies_top"] + my_new_select + ["posts_exp.perspectives", "posts_exp.entities"]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

posts_exp.custom_spoiler not found in schema.

Let's add partitioning to the data, so that people can run query against individual years / months and scan faster.

In [10]:
df4 = (df3.select(my_final_select)
       .withColumn("full_date", (from_unixtime("time")))
       .withColumn("year",  year(from_unixtime("time")))
       .withColumn("month", month(from_unixtime("time")))
       .withColumn("day",   dayofmonth(from_unixtime("time")))
      ).repartition("year", "month", "day")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Trying out different formats and partitioning schemes

In all cases, we want to register the result in Glue.

First let's stick with JSON to be able to inspect the files externally. The job seems to fail with an error message at the end, after about 5 minutes. The files have all been written to S3, as the next cell shows

There are 1198 files, so the data seems complete.

If you run the Glue crawler on it, the table gets registered properly. This remains to be investigated further.

In any case we want to save the data in a binary, more efficient format, see further below.

In [12]:
(df4.write
 .option("path", f"{input_bucket}/raiders-lost-kek/raiders-training-json-part/")
 .option("mode", "overwrite")
 .partitionBy("year", "month", "day")
 .saveAsTable("raiders.raiders_partitioned", mode='overwrite', format='json')
)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

An error was encountered:
Can not create a Path from an empty string
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 868, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
    raise_from(converted)
  File "<string>", line 3, in raise_from
pyspark.sql.utils.IllegalArgumentException: Can not create a Path from an empty string



# Parquet Format

Next, we write the same dataframe as a parquet file with snappy compression. It takes about 5 minutes to do that.

In [9]:
(df4.write
 .option("path", f"{input_bucket}/raiders-lost-kek/raiders-training-parquet-part/")
 .partitionBy("year", "month", "day")
 .saveAsTable("raiders.kek", mode='overwrite', format='parquet')
)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

The previous code created 1198 files, one for each day with data. The progress bar says 905 tasks, these don't seem to map to partitions then.


In [2]:
spark.sql("""
SELECT COUNT(DISTINCT(DATE_TRUNC('day', from_unixtime(time))))
FROM raiders.kek
""").show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------------------------------------------------------------------------------+
|count(DISTINCT date_trunc(day, CAST(from_unixtime(time, yyyy-MM-dd HH:mm:ss) AS TIMESTAMP)))|
+--------------------------------------------------------------------------------------------+
|                                                                                        1198|
+--------------------------------------------------------------------------------------------+

In [13]:
df4.rdd.getNumPartitions()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

905

In [None]:
(df4.write
 .option("path", f"{input_bucket}/raiders-lost-kek/raiders-training-partitioned/")
 .option("mode", "overwrite")
 .saveAsTable("raiders.raiders_partitioned", mode='overwrite')
)


## Word Counts, a simple text analysis tool

In [20]:
df5 = df4.select(explode(split(lower("com"), "[\W0-9]")).alias("term"))
df6 = df5.groupBy("term").count().sort("count", ascending=False)



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+----------+
|        term|     count|
+------------+----------+
|            |4664059498|
|           a| 300423377|
|          gt| 280818845|
|          br| 264087249|
|       class| 162438118|
|         the| 119401160|
|           p| 117468840|
|        href| 117054538|
|   quotelink| 117031344|
|        span|  89577138|
|          to|  73549941|
|         and|  65421255|
|          of|  53657807|
|         you|  51742275|
|          is|  47302634|
|           i|  46351162|
|       quote|  44343516|
|          it|  41625328|
|          in|  39128534|
|        that|  38048157|
|           s|  30327598|
|           t|  27651877|
|         are|  26901296|
|        they|  25686032|
|         for|  25040658|
|        this|  23299513|
|        quot|  21222225|
|          be|  20398454|
|         not|  18060007|
|          on|  18029284|
|        have|  17801637|
|        with|  17760286|
|          he|  15036986|
|         but|  14875509|
|         was|  14324540|
|          w

In [21]:
(df6.write
 .option("path", f"{input_bucket}/raiders-lost-kek/raiders-terms/")
 .option("mode", "overwrite")
 .saveAsTable("raiders.terms", mode='overwrite', compression='gzip')
)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
df5

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2880642

In [None]:
spark.sql("""
describe table extended raiders.raiders_partitioned
""").

In the section below, remove F. prefix

In [None]:
df2=df.select(explode(df.posts).alias("posts"))
df3=df2.select(df2.posts.no.alias('number'),
              df2.posts.sub.alias('subject'),
              df2.posts.country.alias('country'),
              from_unixtime(df2.posts.time).alias('time'),
              df2.posts.country_name.alias('country_name'),
              df2.posts.troll_country.alias('troll_country'),
              df2.posts.com.alias('text'),
              df2.posts.replies.alias('replies'),
              )
#df3.show(10)
#df3.write.parquet(path=f"{input_bucket}/raiders-lost-kek/raiders-training-zstd/", mode='overwrite')


In [None]:
df3.write.option("path", f"{input_bucket}/raiders-lost-kek/raiders-training-zstd/").option("compression", "zstd").saveAsTable("raiders_zstd")

In [None]:
spark.sql("show tables in raiders like '^[^p].*'").show()

In [None]:
df = spark.read.parquet("s3://raiders-lost-kek-us-west-2/raiders-lost-kek/raiders-training/")
df.printSchema()


In [None]:
spark.sql("show tables").show()

In [None]:
df3.printSchema()

In [None]:
df2.write.parquet(path=f"{input_bucket}/raiders-lost-kek/raiders-new-from-ori", mode='overwrite',compression='zstd')
#.saveAsTable("raiders_new_from_ori", mode="overwrite")

In [None]:
spark.sql("show tables in raiders").show()


In [None]:
input_bucket

In [None]:
df3.write.saveAsTable("kek", mode="overwrite", format="parquet", path=f"{input_bucket}/raiders-lost-kek/raiders_2020/")

In [None]:
spark.sql("show tables in default").show()

In [None]:
spark.sql("""

select country_name, troll_country, count(*)/1e6 as count_in_million
from raiders.kek
group by 1, 2
order by 3 desc;

""").show()

In [None]:
spark.sql("""
with all_posts as
(
select explode(posts) as cols from raiders.kek
)
select cols.country, count(*) as count
from all_posts
group by cols.country
order by count desc
""").show()

In [None]:
df = spark.read.parquet("s3://raiders-lost-kek-us-west-2/raiders-lost-kek/raiders-new-from-ori")
sqlContext.registerDataFrameAsTable(df, "raiders")
spark.sql("describe table raiders")

In [None]:
spark.sql("""
select posts.country_name, count(*) as count
from raiders
where posts.country_name is not null
group by 1
order by count desc
""").show()

In [None]:
spark.sql("""
select count(*) as count
from raiders
where posts.country_name IS NULL
""").show()

In [None]:
spark.sql("show tables from raiders_2").show()

In [None]:
df2 = df.select(F.explode(df.Records).alias("exploded"))
#df2.groupby("exploded.eventType", "exploded.eventName").count().sort("eventName", ascending=True).show(250)
df3 = (df2.filter(df2.exploded.eventName=='StartQueryExecution')
          .filter(df2.exploded.ErrorCode.isNull())
          .filter(df2.exploded.requestParameters.queryString.rlike('^SHOW')==False)
         .select(df2.exploded.requestParameters.queryString.alias("queryString"),
                   df2.exploded.responseElements.queryExecutionId.alias("sessionId"),
                   df2.exploded.eventtime.alias("startTime"),
                   df2.exploded.eventtime.alias("sessionStartTime"),
                   df2.exploded.userIdentity.type.alias("userType"),
                   df2.exploded.userIdentity.userName.alias("userName"),
                   df2.exploded.userIdentity.principalid.alias("principalid"),
          ))
df4 = (df3.withColumn("defaultDatabases", F.lit(""))
    .withColumn("userName", F.when(df3.userType=="IAMUSer", df3.userName).otherwise(df3.userName)))
df4.show()
df4.write.saveAsTable("athena_qli", mode="overwrite", format="parquet", path=f"s3://com.alationpro/athena_qli_from_spark/")

In [None]:
#my_df = df4.toPandas()
my_df.to_parquet(path="s3://com.alationpro/athena_qli_from_spark_pd/", egine="auto")

In [None]:
import sys
sys.path

In [None]:
sys.executable

In [None]:
spark.sql("describe table athena_qli").show()

In [None]:
spark.sql("""
SELECT
  CASE CAST(useridentity.type AS VARCHAR)
  WHEN 'AssumedRole'
  THEN SUBSTR(useridentity.principalid, position(':' IN useridentity.principalid)+1)
  WHEN 'IAMUser'
  THEN useridentity.username
  ELSE useridentity.username
  END AS userName
  ,' ' AS defaultDatabases -- <--- changed from 0 to one space
  ,json_extract_scalar(responseElements, '$.queryExecutionId') AS sessionId
  ,eventtime AS sessionStartTime
  ,eventtime AS startTime
  ,json_extract_scalar(requestParameters, '$.queryString') AS queryString
  ,'null' AS milliseconds
  FROM default.athena_qli
  WHERE 1=1
  AND region = 'us-west-2'
  AND json_extract_scalar(requestParameters, '$.queryString') <> 'SHOW SCHEMAS'
  AND json_extract_scalar(requestParameters, '$.queryString') NOT LIKE 'SHOW TABLES IN%'
  AND json_extract_scalar(requestParameters, '$.queryString') <> 'SELECT 1'
  AND CAST(ErrorCode AS VARCHAR) IS NULL
  AND eventname = 'StartQueryExecution'
""")

In [None]:
df3

In [None]:
input_bucket = 's3://com.alationpro/'
input_path = '/big-data-training/event_history.json.gz'
df = spark.read.json(input_bucket + input_path)
df.printSchema()


In [None]:
input_bucket = 's3://aws-cloudtrail-logs-255149284406-4cec155e/'
input_path = 'AWSLogs/255149284406/CloudTrail/us-west-2/2020/11/26/255149284406_CloudTrail_us-west-2_20201126T1055Z_76IDD8USYxbV1OnC.json.gz'
#df = spark.read.json(input_bucket + input_path)
#df.printSchema()
df2 = df.select(F.explode(df.Records))
df3 = df2.groupby(col("col.eventName")).count()
df3.select(col("eventName"), col("count")).sort("count", ascending=False).show()