In [18]:
pip install pyspark




In [19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    		      .master("local[*]") \
                   		      .appName("Word Count") \
                    		      .config("spark.some.config.option", "SparkSessionExample") \
                    		      .getOrCreate()
# Example action: Creates a DataFrame with numbers from 0 to 9 and gathers them into the driver’s 
# memory and creates a list.
print(spark.range(10).collect())

# stop the SparkSession
spark.stop()





In [2]:
from pyspark.sql import SparkSession

# Create or get a Spark Session
spark = SparkSession.builder \
    .appName("Example Application") \
    .getOrCreate()

# Access SparkContext from SparkSession
sc = spark.sparkContext

In [20]:
# Now you can use `sc` to perform RDD operations, for example:
rdd = sc.parallelize([1, 2, 3, 4, 5])
print(rdd.collect())

# Always stop the Spark session when you're done
spark.stop()




In [None]:
# Word Count: output to file
# Assuming you're using local files, but adjust the path as necessary
file = sc.textFile("file:///path/to/your/1112.txt")
# Split each line into words using flatMap
words = file.flatMap(lambda line: line.split(" "))
# Map words to (word, 1) pairs
pairs = words.map(lambda word: (word, 1))
# Reduce by key to count each word
counts = pairs.reduceByKey(lambda a, b: a + b)
# Save the result to a local file system or HDFS
counts.saveAsTextFile("file:///path/to/output")  # Change this path if using HDFS or another storage location

# Always stop the Spark session when you're done
spark.stop()

In [None]:
# Word Count: output to terminal
file = sc.textFile("file:///path/to/your/1112.txt")
words = file.flatMap(lambda line: line.split(" "))
pairs = words.map(lambda word: (word, 1))
counts = pairs.reduceByKey(lambda a, b: a + b)

# Collect results and print them
results = counts.collect()
for (word, count) in results:
    print(word, count)

# Always stop the Spark session when you're done
spark.stop()

## SparkSQL

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("Word Count from JSON") \
    .getOrCreate()

# Read data from a JSON file
df = spark.read.json("input.json")

# Assuming the JSON contains a field 'text' that contains text data
# The explode and split functions are used to transform the text into individual words. 
# The split function splits the text into a list of words, and explode creates a new row for each word.
words = df.select(explode(split(df.text, "\\s+")).alias("word"))

# The words are then grouped by the word itself, and count() is applied to compute the
# number of occurrences of each word.
word_counts = words.groupBy("word").count()

# Show results in the console
word_counts.show()

# Write the results to a Parquet file, a columnar storage file format optimized for reading speed.
word_counts.write.parquet("word_counts.parquet")

# Write the results to a text file, note that this requires a single column DataFrame
# For a simple text file output, you might want to convert it to RDD or use format-specific methods
word_counts.rdd.map(lambda r: r[0] + ": " + str(r[1])).saveAsTextFile("word_counts.txt")

# Stop the Spark session
spark.stop()


## Spark Examples: Sample data.csv below

id,name,age,department,salary
1,John Doe,58,Finance,55000
2,Jane Smith,34,Marketing,62000
3,Bob Johnson,45,Human Resources,45000
4,Lisa Sway,30,IT,76000
5,Michael Vicks,60,Finance,83000


In [21]:
# 1. Basic Data Loading and Transformation
# Loading a CSV File and Applying Transformations:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Data Loading Example") \
    .getOrCreate()

# Load a CSV file into a DataFrame
df = spark.read.csv("file:///path/to/your/data.csv", header=True, inferSchema=True)


# Select specific columns and add a new column
transformed_df = df.select("name", "age").withColumn("senior", df.age > 55)

# Show the transformed data
transformed_df.show()

spark.stop()





In [None]:
# 2. Using Spark SQL for Data Querying
# Creating Temporary Views and Running SQL Queries:

# Assuming 'spark' is already created and 'df' is loaded as above
df.createOrReplaceTempView("people")

# Execute SQL query
result = spark.sql("""
SELECT name, age, senior
FROM people
WHERE senior = TRUE
""")

result.show()

spark.stop()


In [None]:
# Creating and Writing a DataFrame to Parquet
# Writing to Parquet: This format is highly efficient for both storage and processing. It’s a columnar storage 
# file format, which allows for better compression and enhanced query performance by allowing I/O to be performed
# at the column level rather than at the row level.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Write and Read Parquet Example") \
    .getOrCreate()

# Define the schema for the DataFrame
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("department", StringType(), True),
    StructField("salary", IntegerType(), True)
])

# Create some data
data = [
    (1, "John Doe", 58, "Finance", 55000),
    (2, "Jane Smith", 34, "Marketing", 62000),
    (3, "Bob Johnson", 45, "Human Resources", 45000),
    (4, "Lisa Sway", 30, "IT", 76000),
    (5, "Michael Vicks", 60, "Finance", 83000)
]

# Create DataFrame
df = spark.createDataFrame(data, schema=schema)

# Write DataFrame to Parquet
df.write.parquet("path/to/output/data.parquet")

# Stop the session
spark.stop()

In [None]:
# 3. Aggregations and Grouping
# Grouping Data and Computing Aggregations:

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, count

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Aggregations and Grouping") \
    .getOrCreate()

# Load data from Parquet
df = spark.read.parquet("path/to/output/data.parquet")

# Group by one column and calculate aggregates
aggregated_df = df.groupBy("department").agg(
    count("id").alias("num_employees"),
    avg("salary").alias("average_salary")
)

# Show the results
aggregated_df.show()

# Stop the session
spark.stop()

## Integrating with Data Sources

In [7]:
# Database connection parameters
conn_params = {
    "dbname": 'postgres',
    "user": '',  # Replace 'your_username' with your actual PostgreSQL username
    "password": '',  # Replace 'your_password' with your actual PostgreSQL password
    "host": 'localhost'
}

In [22]:
## Writing Data to PostgreSQL
## ensure that the public.persons table exists in your database or 
## modify the script to create the table if it does not exist.
## download jdbc jar from https://jdbc.postgresql.org/download/

from pyspark.sql import SparkSession

# Create Spark session with PostgreSQL JDBC driver included
spark = SparkSession.builder \
    .appName("PySpark PostgreSQL Integration") \
    .config("spark.jars", "/Users/phinnx.com/Downloads/postgresql-42.7.3.jar") \
    .getOrCreate()

# Define connection parameters
conn_params = {
    "user": "",  # replace 'your_username' with your actual PostgreSQL username
    "password": "",  # replace 'your_password' with your actual PostgreSQL password
}

# Sample DataFrame to write
data = [("James", "Smith", "USA", 1),
        ("Michael", "Rose", "USA", 2),
        ("Robert", "Williams", "USA", 3),
        ("Maria", "Jones", "USA", 4)]
columns = ["firstname", "lastname", "country", "id"]
df = spark.createDataFrame(data, schema=columns)

# JDBC URL
url = "jdbc:postgresql://localhost/postgres"

# Write DataFrame to PostgreSQL table
df.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "public.persons") \
    .option("user", conn_params["user"]) \
    .option("password", conn_params["password"]) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

# Stop the Spark session
spark.stop()





In [23]:
# Read data from PostgreSQL table
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", "public.persons") \
    .option("user", conn_params["user"]) \
    .option("password", conn_params["password"]) \
    .load()

jdbcDF.show()

# Stop the Spark session
spark.stop()







## Streaming Data Processing

In [26]:
## Python script that sends data to a socket. This can simulate streaming data

import socket
import time

# Create a socket object
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Use localhost
host = 'localhost'  # or use '127.0.0.1'

# Reserve a port for your service
port = 9999

# Bind to the port
s.bind((host, port))

# Wait for client connection
s.listen(5)
print(f"Server listening on port {port}...")

# Establish connection with client
conn, addr = s.accept()
print(f"Connected by {addr}")

try:
    while True:
        # Send data
        conn.send("Hello Spark Streaming\n".encode('utf-8'))
        time.sleep(1)  # slow down the loop to simulate streaming data
except KeyboardInterrupt:
    print("Stopped by the user")

# Close the connection
conn.close()




In [25]:
## PySpark script to read this data and perform real-time analytics

from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

# Initialize a SparkSession
spark = SparkSession.builder \
    .appName("Network Word Count") \
    .getOrCreate()

# Read text from a socket (streaming)
lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "localhost") \
    .option("port", 9999) \
    .load()

# Split the lines into words
words = lines.select(expr("explode(split(value, ' ')) as word"))

# Count each word
wordCounts = words.groupBy("word").count()

# Output the results to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

query.awaitTermination()


