<a href="https://colab.research.google.com/github/parlad/Spark-ML/blob/master/readMultipleFile.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

import findspark
findspark.init()

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
 
  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Read Text to RDD - Python")
  sc = SparkContext.getOrCreate();


 
  # read input text files present in the directory to RDD
  lines = sc.textFile("/content/file1.txt,/content/file2.txt,/content/file3.txt")
 
  # collect the RDD to a list
  llist = lines.collect()
 
  # print the list
  for line in llist:
    print(line)

## Read all text files in a directory to single RDD

In [None]:
import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
 
  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Read Text to RDD - Python")
  sc = SparkContext.getOrCreate();
 
  # read input text files present in the directory to RDD
  lines = sc.textFile("/content/mldata/")
 
  # collect the RDD to a list
  llist = lines.collect()
 
  # print the list
  for line in llist:
    print(line)

## Read all text files in multiple directories to single RDD


In [None]:
import sys
 
from pyspark import SparkContext, SparkConf
 
if __name__ == "__main__":
 
  # create Spark context with Spark configuration
  conf = SparkConf().setAppName("Read Text to RDD - Python")
  sc = SparkContext.getOrCreate();
 
  # read input text files present in the directory to RDD
  lines = sc.textFile("/content/ml1data,/content/mldata")
 
  # collect the RDD to a list
  llist = lines.collect()
 
  # print the list
  for line in llist:
    print(line)

## Read all text files, matching a pattern, to single RDD


## PySpark – Split dataframe into equal number of rows

In [None]:
# importing module
import pyspark

# importing sparksession from pyspark.sql module
from pyspark.sql import SparkSession

# creating sparksession and giving an app name
spark = SparkSession.builder.appName('sparkdf').getOrCreate()

# Column names for the dataframe
columns = ["Brand", "Product"]

# Row data for the dataframe
data = [
	("HP", "Laptop"),
	("Lenovo", "Mouse"),
	("Dell", "Keyboard"),
	("Samsung", "Monitor"),
	("MSI", "Graphics Card"),
	("Asus", "Motherboard"),
	("Gigabyte", "Motherboard"),
	("Zebronics", "Cabinet"),
	("Adata", "RAM"),
	("Transcend", "SSD"),
	("Kingston", "HDD"),
	("Toshiba", "DVD Writer")
]

# Create the dataframe using the above values
prod_df = spark.createDataFrame(data=data,
								schema=columns)

# View the dataframe
prod_df.show()


## Split the dataframe, perform the operation and concatenate the result

In [None]:
# Define the number of splits you want
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import concat, col, lit

n_splits = 4

# Calculate count of each dataframe rows
each_len = prod_df.count() // n_splits

# Create a copy of original dataframe
copy_df = prod_df

# Function to modify columns of each individual split


def modify_dataframe(data):
	return data.select(
		concat(col("Brand"), lit(" - "),
			col("Product"))
	)


# Create an empty dataframe to
# store concatenated results
schema = StructType([
	StructField('Brand - Product', StringType(), True)
])
result_df = spark.createDataFrame(data=[],
								schema=schema)

# Iterate for each dataframe
i = 0
while i < n_splits:

	# Get the top `each_len` number of rows
	temp_df = copy_df.limit(each_len)

	# Truncate the `copy_df` to remove
	# the contents fetched for `temp_df`
	copy_df = copy_df.subtract(temp_df)

	# Perform operation on the newly created dataframe
	temp_df_mod = modify_dataframe(data=temp_df)
	temp_df_mod.show(truncate=False)

	# Concat the dataframe
	result_df = result_df.union(temp_df_mod)

	# Increment the split number
	i += 1

result_df.show(truncate=False)


## Other example

In [None]:
!pip install kafka

In [None]:
!pip install py4j

In [None]:
!pip install pyspark

In [None]:
import pandas as pd
from kafka import KafkaProducer
from datetime import datetime
import time
import random
import numpy as np

# pip install kafka-python

KAFKA_TOPIC_NAME_CONS = "Topic"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost:9092'

if __name__ == "__main__":
    print("Kafka Producer Application Started ... ")

    kafka_producer_obj = KafkaProducer(bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS_CONS,
                                       value_serializer=lambda x: x.encode('utf-8'))
    
    filepath = "IRIS.csv"
    
    flower_df = pd.read_csv(filepath)
  
    flower_df['order_id'] = np.arange(len(flower_df))

    
    flower_list = flower_df.to_dict(orient="records")
       

    message_list = []
    message = None
    for message in flower_list:
        
        message_fields_value_list = []
               
        message_fields_value_list.append(message["order_id"])
        message_fields_value_list.append(message["sepal_length"])
        message_fields_value_list.append(message["sepal_width"])
        message_fields_value_list.append(message["petal_length"])
        message_fields_value_list.append(message["petal_width"])
        message_fields_value_list.append(message["species"])

        message = ','.join(str(v) for v in message_fields_value_list)
        print("Message Type: ", type(message))
        print("Message: ", message)
        kafka_producer_obj.send(KAFKA_TOPIC_NAME_CONS, message)
        time.sleep(1)


    print("Kafka Producer Application Completed. ")

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import Normalizer, StandardScaler
import random

import time

kafka_topic_name = "Topic"
kafka_bootstrap_servers = 'localhost:9092'

spark = SparkSession \
        .builder \
        .appName("Structured Streaming ") \
        .master("local[*]") \
        .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

# Construct a streaming DataFrame that reads from topic
flower_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
        .option("subscribe", kafka_topic_name) \
        .option("startingOffsets", "latest") \
        .load()

flower_df1 = flower_df.selectExpr("CAST(value AS STRING)", "timestamp")


flower_schema_string = "order_id INT,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,sepal_length DOUBLE,species STRING"



flower_df2 = flower_df1 \
        .select(from_csv(col("value"), flower_schema_string) \
                .alias("flower"), "timestamp")


flower_df3 = flower_df2.select("flower.*", "timestamp")

    
flower_df3.createOrReplaceTempView("flower_find");
song_find_text = spark.sql("SELECT * FROM flower_find")
flower_agg_write_stream = song_find_text \
        .writeStream \
        .trigger(processingTime='5 seconds') \
        .outputMode("append") \
        .option("truncate", "false") \
        .format("memory") \
        .queryName("testedTable") \
        .start()

flower_agg_write_stream.awaitTermination(1)