In [1]:
import findspark
findspark.init()

import pyspark
findspark.find()

'C:\\spark-3.3.1-bin-hadoop2\\spark-3.3.1-bin-hadoop2'

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession, SQLContext

from pyspark.sql.functions import *
from pyspark.sql import Row

# Initiate Spark

In [3]:
# conf = (SparkConf() 
#     .set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector:10.0.0,org.postgresql:postgresql:42.5.0")
#     .setMaster("local") #spark://spark-master:7077
#     .setAppName('SparkApp') 
# )

# sc = SparkContext(conf=conf)

# spark = SparkSession(sc)

spark = (SparkSession
    .builder
    .appName("streamingExampleRead")
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:10.0.0,org.postgresql:postgresql:42.5.0')
    # .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.postgresql:postgresql:42.5.0')
    .getOrCreate()
)

spark

# Read Stream From MongoDB

In [4]:
player_stream_df = (spark.readStream
    .format("mongodb")
    .option("spark.mongodb.connection.uri", 'mongodb://localhost:27017')
    .option('spark.mongodb.database', 'football_data_new')
    .option('spark.mongodb.collection', 'player')
    .load()
)

# player_stream_df = (spark.readStream
#     .format("kafka")
#     .option("kafka.bootstrap.servers", 'localhost:9092')
#     .option("subscribe", "football_player")
#     .load()
# )

player_stream_df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- info: struct (nullable = true)
 |    |-- ShortName: string (nullable = true)
 |    |-- FullName: string (nullable = true)
 |    |-- Position: string (nullable = true)
 |    |-- Height: string (nullable = true)
 |    |-- Weight: string (nullable = true)
 |    |-- Footed: string (nullable = true)
 |    |-- DOB: string (nullable = true)
 |    |-- Nationality: string (nullable = true)
 |    |-- Club: string (nullable = true)
 |-- stats: struct (nullable = true)
 |    |-- adv_goalkeeping: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- #OPA: integer (nullable = true)
 |    |    |    |-- #OPA/90: double (nullable = true)
 |    |    |    |-- /90: double (nullable = true)
 |    |    |    |-- 90s: double (nullable = true)
 |    |    |    |-- Age: integer (nullable = true)
 |    |    |    |-- Att: integer (nullable = true)
 |    |    |    |-- AvgDist: double (nullable = true)
 |    |    |    |-- AvgLen:

# Process Data

In [5]:
# INFO SCHEMA
info_table_schema = player_stream_df.select('info').schema[0].dataType

# STATS SCHEMA
stats_table_schemas = {}
stats_table_names = player_stream_df.schema['stats'].dataType.fieldNames()
for table_name in stats_table_names:
    full_table_name = 'stats.' + table_name
    table_schema = player_stream_df.select(full_table_name).schema[0].dataType.elementType
    stats_table_schemas[table_name] = table_schema
    
# stats_table_names = spark.sparkContext.broadcast(stats_table_names)
# stats_table_schemas = spark.sparkContext.broadcast(stats_table_schemas)

+---------+--------+--------+------+------+------+---+-----------+----+
|ShortName|FullName|Position|Height|Weight|Footed|DOB|Nationality|Club|
+---------+--------+--------+------+------+------+---+-----------+----+
+---------+--------+--------+------+------+------+---+-----------+----+



In [10]:
# stats_tables = {}
# stats_datas = {}
# stats_datas = spark.sparkContext.broadcast(stats_datas)

# info_table = spark.createDataFrame(
#     data = [],
#     schema = info_table_schema
# )

def process_row(row: Row):  
    # table_names = stats_table_names.value
    # table_schemas = stats_table_schemas.value
    
    player_id = row['_id']
    player_info = row['info']
    player_stats = row['stats']

    print(player_id)

    return player_info
    
# def process_batch(df: DataFrame, epoch_id):
#     df.printSchema()

In [12]:
map = player_stream_df.mapInPandas(process_row, info_table_schema)

In [18]:
map.writeStream.foreach(lambda x: print(x)).start()

<pyspark.sql.streaming.StreamingQuery at 0x2d51d93d160>

### TEST

In [19]:
stream_writer = (player_stream_df.writeStream
     .format("console")
     .trigger(continuous="1 second")
#     .foreach(process_row)
     .outputMode("append")
)

stream_writer.start()

<pyspark.sql.streaming.StreamingQuery at 0x2d51d9a5370>

# Write Stream to PostgreSQL

In [120]:
def _save_to_Postgres(df, epoch_id, table_name):         
    df.write \
        .mode('append') \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://postgres:5432/news_crawled") \
        .option("driver", "org.postgresql.Driver") \
        .option("dbtable", table_name) \
        .option("user", 'test1') \
        .option("password", 'test1') \
        .save() 

In [121]:
for table_name in stats_tables:
    full_table_name = 'player.' + table_name
    postgresql_stream = (stats_tables[table_name]
        .writeStream 
        .trigger(processingTime='5 seconds')
        .outputMode('update')
        .foreachBatch(
            lambda df, epoch_id: _write_streaming(df, epoch_id, full_table_name)
        )
        .start()
    )

# Terminate Spark Stream

In [None]:
spark.streams.awaitAnyTermination()
spark.stop()