In [4]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local')\
                                    .appName('SparkStreamingAppendMode')\
                                    .getOrCreate()
    
    sparkSession.sparkContext.setLogLevel('ERROR')
    
    schema = StructType([StructField('category', StringType(), True),
                         StructField('on twitter since', StringType(), True),
                         StructField('twitter handle', StringType(), True),
                         StructField('profile url', StringType(), True),
                         StructField('followers', StringType(), True),
                         StructField('following', StringType(), True),
                         StructField('profile location', StringType(), True),
                         StructField('profile lat/lon', StringType(), True),
                         StructField('profile description', StringType(), True)
    ])
    
    fileStreamDF = sparkSession.readStream\
                                .option('header', 'true')\
                                .schema(schema)\
                                .csv('../input/datasets/dropfolder')
    
    fileStreamDF = fileStreamDF.withColumnRenamed('twitter handle', 'twitter_handle')\
                                .withColumnRenamed('profile location', 'profile_location')
    
    print(' ')
    print('Is the stream ready?')
    print(fileStreamDF.isStreaming)
    
    print(' ')
    print('Schema of the input Stream')
    print(fileStreamDF.printSchema)
    
    trimmedDF = fileStreamDF.select(fileStreamDF.category,
                                   fileStreamDF.twitter_handle,
                                   fileStreamDF.profile_location,
                                   fileStreamDF.followers)\
                                    .withColumnRenamed('followers', 'companions')
    
    query = trimmedDF.writeStream\
                        .outputMode('append')\
                        .format('console')\
                        .option('truncate', 'false')\
                        .option('numRows', 30)\
                        .start()\
                        .awaitTermination()

 
Is the stream ready?
True
 
Schema of the input Stream
<bound method DataFrame.printSchema of DataFrame[category: string, on twitter since: string, twitter_handle: string, profile url: string, followers: string, following: string, profile_location: string, profile lat/lon: string, profile description: string]>


KeyboardInterrupt: 

In [None]:
# Default count
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local')\
                                    .appName('SparkStreamingCompleteMode')\
                                    .getOrCreate()
    
    sparkSession.sparkContext.setLogLevel('ERROR')
    
    schema = StructType([StructField('category', StringType(), True),
                         StructField('on twitter since', StringType(), True),
                         StructField('twitter handle', StringType(), True),
                         StructField('profile url', StringType(), True),
                         StructField('followers', StringType(), True),
                         StructField('following', StringType(), True),
                         StructField('profile location', StringType(), True),
                         StructField('profile lat/lon', StringType(), True),
                         StructField('profile description', StringType(), True)
    ])
    
    fileStreamDF = sparkSession.readStream\
                                .option('header', 'true')\
                                .option('maxFilesPerTrigger', 1)\
                                .schema(schema)\
                                .csv('../input/datasets/dropfolder')

    # groupby category and find the count with default count
    recordsPerCategory = fileStreamDF.groupby('category')\
                                        .count()\
                                        .orderBy('count', ascending=False)
    
    query = trimmedDF.writeStream\
                        .outputMode('complete')\
                        .format('console')\
                        .option('truncate', 'false')\
                        .option('numRows', 30)\
                        .start()\
                        .awaitTermination()

In [None]:
#Aggregate functions
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local')\
                                    .appName('SparkStreamingAggregate')\
                                    .getOrCreate()
    
    sparkSession.sparkContext.setLogLevel('ERROR')
    
    schema = StructType([StructField('category', StringType(), True),
                         StructField('on twitter since', StringType(), True),
                         StructField('twitter handle', StringType(), True),
                         StructField('profile url', StringType(), True),
                         StructField('followers', StringType(), True),
                         StructField('following', StringType(), True),
                         StructField('profile location', StringType(), True),
                         StructField('profile lat/lon', StringType(), True),
                         StructField('profile description', StringType(), True)
    ])
    
    fileStreamDF = sparkSession.readStream\
                                .option('header', 'true')\
                                .option('maxFilesPerTrigger', 1)\
                                .schema(schema)\
                                .csv('../input/datasets/dropfolder')

    # groupby the dataframe based on category column and find the sum with aggregate functions
    recordsPerCategory = fileStreamDF.groupby('category')\
                                        .agg({'followers': 'sum'})\
                                        .withColumnRenamed('sum(followers)', 'total_followers')
                                        .orderBy('total_followers', ascending=False)
    
    query = trimmedDF.writeStream\
                        .outputMode('complete')\
                        .format('console')\
                        .option('truncate', 'false')\
                        .option('numRows', 30)\
                        .start()\
                        .awaitTermination()

In [None]:
#sql functions
from pyspark.sql.types import *
from pyspark.sql import SparkSession

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local')\
                                    .appName('SparkStreamingSQLQuery')\
                                    .getOrCreate()
    
    sparkSession.sparkContext.setLogLevel('ERROR')
    
    schema = StructType([StructField('category', StringType(), True),
                         StructField('on twitter since', StringType(), True),
                         StructField('twitter handle', StringType(), True),
                         StructField('profile url', StringType(), True),
                         StructField('followers', StringType(), True),
                         StructField('following', StringType(), True),
                         StructField('profile location', StringType(), True),
                         StructField('profile lat/lon', StringType(), True),
                         StructField('profile description', StringType(), True)
    ])
    
    fileStreamDF = sparkSession.readStream\
                                .option('header', 'true')\
                                .option('maxFilesPerTrigger', 1)\
                                .schema(schema)\
                                .csv('../input/datasets/dropfolder')
    
    fileStreamDF.createOrReplaceTempView('disaster_accident_crime_accounts')
    
    categoryDF = SparkSession.sql("SELECT from category, following\
                                    FROM disaster_accident_crime_accounts\
                                    WHERE followers > '15000'")
    
    from pyspark.sql.functions import format_number
    from pyspark.sql.functions import col
    

    # groupby the dataframe based on category column and find the sum with aggregate functions
    recordsPerCategory = fileStreamDF.groupby('category')\
                                        .agg({'followers': 'sum'})\
                                        .withColumnRenamed('sum(followers)', 'total_followers')
                                        .orderBy('total_followers', ascending=False)\
                                        .withColumn('total_following', format_number(col('total_following'), 0))
    
    query = trimmedDF.writeStream\
                        .outputMode('complete')\
                        .format('console')\
                        .option('truncate', 'false')\
                        .option('numRows', 30)\
                        .start()\
                        .awaitTermination()

In [None]:
#UDF
from pyspark.sql.types import *
from pyspark.sql import SparkSession

from pyspark.sql.functions import udf
import time
import datetime

if __name__ == "__main__":
    sparkSession = SparkSession.builder.master('local')\
                                    .appName('SparkStreamingAddTimeStamp')\
                                    .getOrCreate()
    
    sparkSession.sparkContext.setLogLevel('ERROR')
    
    schema = StructType([StructField('category', StringType(), True),
                         StructField('on twitter since', StringType(), True),
                         StructField('twitter handle', StringType(), True),
                         StructField('profile url', StringType(), True),
                         StructField('followers', StringType(), True),
                         StructField('following', StringType(), True),
                         StructField('profile location', StringType(), True),
                         StructField('profile lat/lon', StringType(), True),
                         StructField('profile description', StringType(), True)
    ])
    
    fileStreamDF = sparkSession.readStream\
                                .option('header', 'true')\
                                .option('maxFilesPerTrigger', 1)\
                                .schema(schema)\
                                .csv('../input/datasets/dropfolder')
    
    fileStreamDF = fileStreamDF.withColumnRenamed('twitter handle', 'twitter_handle')\
                            .withColumnRenamed('profile location', 'profile_location')
    
    def add_timestamp():
        ts = time.time()
        timestamp = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
        return timestamp
    
    add_timestamp_udf = udf(add_timestamp, StringType())
    
    fileStreamWithTS = fileStreamDF.withColumn('timestamp', add_timestamp_udf())
    
    trimmedDF = fileStreamWithTS.select('category', 
                                       'twitter_handle', 
                                       'followers',
                                       'timestamp')
    
    query = trimmedDF.writeStream\
                        .outputMode('append')\
                        .format('console')\
                        .option('truncate', 'false')\
                        .option('numRows', 30)\
                        .start()\
                        .awaitTermination()