## Adding Spark Dependencies
* Kafka dependencies to connect to Kafka;
* ABRIS dependencies to parse Kafka Schema Registry Avros;
* Confluent Schema Registry Client to connect to Schema Registry.

In [1]:
import os
kafka_deps = 'org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1'
confluent_schema_deps = 'io.confluent:kafka-schema-registry-client:7.1.1,' + 'io.confluent:kafka-avro-serializer:7.1.1'
abris_deps = 'za.co.absa:abris_2.12:6.3.0' 
spark_deps  = f"--repositories https://packages.confluent.io/maven --packages {kafka_deps},{confluent_schema_deps},{abris_deps} pyspark-shell"
print(spark_deps)
os.environ['PYSPARK_SUBMIT_ARGS'] = spark_deps

--repositories https://packages.confluent.io/maven --packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.1,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1,io.confluent:kafka-schema-registry-client:7.1.1,io.confluent:kafka-avro-serializer:7.1.1,za.co.absa:abris_2.12:6.3.0 pyspark-shell


## Importing necessary libs

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col, udf, struct
from pyspark import SparkContext
from pyspark.sql.column import Column, _to_java_column

## Starting Spark Context
If you check your terminal you'll see that Spark is dowloading the libs from repositories (Maven and Confluent - added with ```--repositories``` flag) 

In [3]:
spark_context = SparkSession.builder.appName('processingBots').getOrCreate()

## Creating functions to parse Confluent Avro
Using examples from https://github.com/AbsaOSS/ABRiS/blob/master/documentation/python-documentation.md

In [4]:
def from_avro(col, config):
    """
    avro deserialize

    :param col (PySpark column / str): column name "key" or "value"
    :param config (za.co.absa.abris.config.FromAvroConfig): abris config, generated from abris_config helper function
    :return: PySpark Column
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro

    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

def from_avro_abris_config(config_map, topic, is_key):
    """
    Create from avro abris config with a schema url

    :param config_map (dict[str, str]): configuration map to pass to deserializer, ex: {'schema.registry.url': 'http://localhost:8081'}
    :param topic (str): kafka topic
    :param is_key (bool): boolean
    :return: za.co.absa.abris.config.FromAvroConfig
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .fromConfluentAvro() \
        .downloadReaderSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

## Kafka Topics to Spark read Configs 

In [5]:
starting_offsets = 'earliest'

wikipedia_df = spark_context.readStream.format('kafka')\
    .option('kafka.bootstrap.servers', 'localhost:12091')\
    .option('subscribe', 'wikipedia.parsed')\
    .option("startingOffsets", starting_offsets)\
    .load()

In [6]:
print('Before parsing schema, this is the default schema from Kafka Topics')
wikipedia_df.printSchema()

Before parsing schema, this is the default schema from Kafka Topics
root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



### Config Confluent Avro Parser using SSL and Basic Auth 

In [7]:
confluent_config = {
    'schema.registry.url':'https://localhost:8085',
    "schema.registry.ssl.truststore.location" :"/home/jovyan/work/scripts/security/kafka.schemaregistry.truststore.jks",
    "schema.registry.ssl.truststore.password": 'confluent',
    "schema.registry.ssl.keystore.location": '/home/jovyan/work/scripts/security/kafka.schemaregistry.keystore.jks',
    "schema.registry.ssl.keystore.password" : "confluent",
    "basic.auth.credentials.source": "USER_INFO",
    "basic.auth.user.info": "superUser:superUser"
} 
from_avro_abris_settings = from_avro_abris_config(confluent_config, 'wikipedia.parsed', False)
wikipedia_df = wikipedia_df.withColumn('value', from_avro(col('value'), from_avro_abris_settings))

In [8]:
print('Schema after parsing:')
wikipedia_df.printSchema()

Schema after parsing:
root
 |-- key: binary (nullable = true)
 |-- value: struct (nullable = true)
 |    |-- bot: boolean (nullable = true)
 |    |-- comment: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- length: struct (nullable = true)
 |    |    |-- new: long (nullable = true)
 |    |    |-- old: long (nullable = true)
 |    |-- log_action: string (nullable = true)
 |    |-- log_action_comment: string (nullable = true)
 |    |-- log_id: long (nullable = true)
 |    |-- log_type: string (nullable = true)
 |    |-- meta: struct (nullable = false)
 |    |    |-- domain: string (nullable = true)
 |    |    |-- dt: timestamp (nullable = false)
 |    |    |-- id: string (nullable = false)
 |    |    |-- request_id: string (nullable = true)
 |    |    |-- stream: string (nullable = false)
 |    |    |-- uri: string (nullable = true)
 |    |-- minor: boolean (nullable = true)
 |    |-- namespace: long (nullable = true)
 |    |-- parsedcomment: string (nullable = tr

### Filtering bots and sending to Topics
Original Queries:


No Bots
```sql
CREATE STREAM wikipedianobot AS SELECT *, (length->new - length->old) AS BYTECHANGE FROM wikipedia WHERE bot = false AND length IS NOT NULL AND length->new IS NOT NULL AND length->old IS NOT NULL;
```
Bots Query:
```sql
CREATE STREAM wikipediabot AS SELECT *, (length->new - length->old) AS BYTECHANGE FROM wikipedia WHERE bot = true AND length IS NOT NULL AND length->new IS NOT NULL AND length->old IS NOT NULL;
```

In [9]:
wikipedia_df = wikipedia_df.withColumn('value', struct('value.*', 
                                       (col('value.length.new') - col('value.length.old')).alias('BYTECHANGE')))
bots_df = wikipedia_df.filter((col('value.bot') == True) & (col('value.length.new').isNotNull()) & (col('value.length.new').isNotNull()))
no_bots_df = wikipedia_df.filter((col('value.bot') == False) & (col('value.length.new').isNotNull()) & (col('value.length.new').isNotNull()))

### Parsing value data to send back to Kafka

In [10]:
def to_avro(col, config):
    """
    avro serialize
    :param col (PySpark column / str): column name "key" or "value"
    :param config (za.co.absa.abris.config.ToAvroConfig): abris config, generated from abris_config helper function
    :return: PySpark Column
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro

    return Column(abris_avro.functions.to_avro(_to_java_column(col), config))

def to_avro_abris_config(config_map, topic, is_key):
    """
    Create to avro abris config with a schema url

    :param config_map (dict[str, str]): configuration map to pass to the serializer, ex: {'schema.registry.url': 'http://localhost:8081'}
    :param topic (str): kafka topic
    :param is_key (bool): boolean
    :return: za.co.absa.abris.config.ToAvroConfig
    """
    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)

    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .toConfluentAvro() \
        .downloadSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

In [11]:
no_bots_df = no_bots_df.select(col('value'))
bots_df = bots_df.select(col('value'))

In [12]:
to_avro_abris_settings = to_avro_abris_config(confluent_config, 'WIKIPEDIABOT', False)
no_bots_df = no_bots_df.withColumn('value', to_avro(col('value'), to_avro_abris_settings))
bots_df = bots_df.withColumn('value', to_avro(col('value'), to_avro_abris_settings))

### Writing back to Kafka 

In [13]:
bots_df.writeStream.format('kafka')\
    .option("kafka.bootstrap.servers", 'localhost:12091') \
    .option("checkpointLocation", "../checkpoints/") \
    .option("topic", "WIKIPEDIABOT") \
    .outputMode("append") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f8cf0e43730>

In [14]:
no_bots_df.writeStream.format('kafka')\
    .option("kafka.bootstrap.servers", 'localhost:12091') \
    .option("checkpointLocation", "../checkpoints/") \
    .option("topic", "WIKIPEDIANOBOT") \
    .outputMode("append") \
    .start()

<pyspark.sql.streaming.StreamingQuery at 0x7f8cf0fbc070>