In [70]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.functions import expr, col
from pyspark.sql.window import Window

import json

#### Utils

In [71]:
spark = SparkSession.builder.appName("example").getOrCreate()

In [72]:
df = spark.read.json("example_data.json", multiLine=True)

df.show(truncate=50)

+-----+------+---------+--------------------+-------------+-------------+--------------------------------------------------+
|  key|offset|partition|           timestamp|timestampType|        topic|                                             value|
+-----+------+---------+--------------------+-------------+-------------+--------------------------------------------------+
|0011$| 12345|        0|2024-05-16T12:34:56Z|            0|example_topic|data: {"$schema":"/mediawiki/revision/create/2....|
|1112#| 67890|        1|2024-05-16T13:45:30Z|            1|another_topic|id: [{"topic":"eqiad.mediawiki.page-create","pa...|
|2223=| 24680|        2|2024-05-16T14:56:12Z|            0|  topic_three|                                                  |
| 3334| 13579|        0|2024-05-16T15:07:45Z|            1|   test_topic|                                    event: message|
|@1111| 12345|        0|2024-05-16T12:34:56Z|            0|example_topic|data: {"$schema":"/mediawiki/revision/create/2....|


In [73]:
input_df = df.selectExpr("CAST(value AS STRING)")
input_df.show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                               value|
+----------------------------------------------------------------------------------------------------+
|data: {"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/...|
|id: [{"topic":"eqiad.mediawiki.page-create","partition":0,"timestamp":1715790794001},{"topic":"co...|
|                                                                                                    |
|                                                                                      event: message|
|data: {"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/...|
|data: {"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://zh.wiktionary.org/wiki...|
|data: {"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https

In [74]:
data_df = input_df\
          .filter(F.expr("value LIKE 'data: %'"))\
          .select(F.expr("substring(value, 7) AS data"))\

data_df.show(truncate=100)

+----------------------------------------------------------------------------------------------------+
|                                                                                                data|
+----------------------------------------------------------------------------------------------------+
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/F...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/C...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://zh.wiktionary.org/wiki/Categ...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://commons.wikimedia.org/wiki/C...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://zh.wiktionary.org/wiki/Categ...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://zh.wiktionary.org/wiki/Categ...|
|{"$schema":"/mediawiki/revision/create/2.0.0","meta":{"uri":"https://www

In [78]:
parsed_df = data_df\
              .selectExpr("from_json(data, 'struct<"+
                "meta:struct<domain:string, request_id:string>, "+
                "performer:struct<user_is_bot:boolean, user_id:integer>, "+
                "dt:string, "+
                "page_title:string>') AS data")\
                .select("data.*")
              
parsed_df.printSchema()
parsed_df.show(truncate=40)

root
 |-- meta: struct (nullable = true)
 |    |-- domain: string (nullable = true)
 |    |-- request_id: string (nullable = true)
 |-- performer: struct (nullable = true)
 |    |-- user_is_bot: boolean (nullable = true)
 |    |-- user_id: integer (nullable = true)
 |-- dt: string (nullable = true)
 |-- page_title: string (nullable = true)

+----------------------------------------+----------------+--------------------+----------------------------------------+
|                                    meta|       performer|                  dt|                              page_title|
+----------------------------------------+----------------+--------------------+----------------------------------------+
|{commons.wikimedia.org, cedf5ea9-dc0a...|  {true, 910182}|2024-05-15T16:33:14Z|File:Church_of_St_Peter_^_St_Paul,_No...|
|{commons.wikimedia.org, 3d11fe0e-bdf2...|{false, 2826358}|2024-05-16T16:33:45Z|Category:Locations_along_Farm_to_Mark...|
|{zh.wiktionary.org, cc16864e-e914-42c...|   {t

In [79]:
filtered_df = parsed_df.filter(
        expr("meta.domain IN ('en.wikipedia.org', 'www.wikidata.org', 'commons.wikimedia.org')") &
        (~expr("performer.user_is_bot"))
    )

filtered_df.show(truncate=50)

+--------------------------------------------------+----------------+--------------------+-------------------------------------------------+
|                                              meta|       performer|                  dt|                                       page_title|
+--------------------------------------------------+----------------+--------------------+-------------------------------------------------+
|{commons.wikimedia.org, 3d11fe0e-bdf2-4c41-b7e0...|{false, 2826358}|2024-05-16T16:33:45Z|Category:Locations_along_Farm_to_Market_Road_2540|
|{commons.wikimedia.org, 3d11fe0e-bdf2-4c41-b7e0...|{false, 2826358}|2024-05-16T16:33:45Z|Category:Locations_along_Farm_to_Market_Road_2540|
|{en.wikipedia.org, 18b12289-c3cd-4c7a-9c54-ce93...|  {false, 53191}|2024-05-16T16:32:59Z|                                Category:日語變體|
+--------------------------------------------------+----------------+--------------------+-------------------------------------------------+



In [80]:
selected_df = filtered_df.select(
        col("meta.domain").alias("domain"),
        col("meta.request_id").alias("request_id"),
        col("performer.user_id").alias("user_id"),
        col("dt").alias("timestamp"),
        col("page_title")
    )

selected_df.show(truncate=50)

+---------------------+------------------------------------+-------+--------------------+-------------------------------------------------+
|               domain|                          request_id|user_id|           timestamp|                                       page_title|
+---------------------+------------------------------------+-------+--------------------+-------------------------------------------------+
|commons.wikimedia.org|3d11fe0e-bdf2-4c41-b7e0-aeec9d97e734|2826358|2024-05-16T16:33:45Z|Category:Locations_along_Farm_to_Market_Road_2540|
|commons.wikimedia.org|3d11fe0e-bdf2-4c41-b7e0-aeec9d97e734|2826358|2024-05-16T16:33:45Z|Category:Locations_along_Farm_to_Market_Road_2540|
|     en.wikipedia.org|18b12289-c3cd-4c7a-9c54-ce937bba8b40|  53191|2024-05-16T16:32:59Z|                                Category:日語變體|
+---------------------+------------------------------------+-------+--------------------+-------------------------------------------------+



In [84]:
formatted_df = selected_df \
      .withColumn("value", F.to_json(F.struct("domain", "request_id", "user_id", "timestamp", "page_title")))\
      .selectExpr("CAST(value AS STRING)")

formatted_df.show(truncate=40)

+----------------------------------------+
|                                   value|
+----------------------------------------+
|{"domain":"commons.wikimedia.org","re...|
|{"domain":"commons.wikimedia.org","re...|
|{"domain":"en.wikipedia.org","request...|
+----------------------------------------+

