In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285387 sha256=4b374a7903c25f15d97f123bd64666d760ce3f10c974c002266176d58eec6973
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
# Import necessary modules
from pyspark.sql import SparkSession

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("RDD Operations Example") \
    .getOrCreate()

# Create an RDD from a list
data = [1, 2, 3, 4, 5]
rdd = spark.sparkContext.parallelize(data)

# Example 1: Using map to square each element
squared_rdd = rdd.map(lambda x: x * x)
print("Squared elements:", squared_rdd.collect())

# Example 2: Using filter to get even numbers
even_rdd = rdd.filter(lambda x: x % 2 == 0)
print("Even elements:", even_rdd.collect())

# Example 3: Using flatMap to create a new sequence
flattened_rdd = rdd.flatMap(lambda x: (x, x * 3))
print("Flattened elements:", flattened_rdd.collect())

# Example 4: Using count to get the number of elements
count = rdd.count()
print("Number of elements:", count)

# Example 5: Using reduce to sum elements
sum_of_elements = rdd.reduce(lambda a, b: a + b)
print("Sum of elements:", sum_of_elements)


Squared elements: [1, 4, 9, 16, 25]
Even elements: [2, 4]
Flattened elements: [1, 3, 2, 6, 3, 9, 4, 12, 5, 15]
Number of elements: 5
Sum of elements: 15


In [None]:
from pyspark.sql import SparkSession, Row

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("DataFrame Operations Example") \
    .getOrCreate()

# Create DataFrame from list of Rows
rdd = spark.sparkContext.parallelize([Row(id=1, name="John"), Row(id=2, name="Mike")])
df = spark.createDataFrame(rdd)

# Show DataFrame
print("Original DataFrame:")
df.show()

# Select operation
print("Select 'name' column:")
df.select("name").show()

# Filter operation
print("Filter by 'id > 1':")
df.filter(df.id > 1).show()


Original DataFrame:
+---+----+
| id|name|
+---+----+
|  1|John|
|  2|Mike|
+---+----+

Select 'name' column:
+----+
|name|
+----+
|John|
|Mike|
+----+

Filter by 'id > 1':
+---+----+
| id|name|
+---+----+
|  2|Mike|
+---+----+



In [None]:
from pyspark import SparkFiles
from PIL import Image
import io

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Image Audio Example") \
    .getOrCreate()

# Add a path to a file. In this example, we assume a local path, but you can add HDFS paths as well
spark.sparkContext.addFile("/content/image_cutout2.png")

# Read the image file
with open(SparkFiles.get("image_cutout2.png"), "rb") as f:
    img_data = f.read()

# Convert to an image format you can work with (using PIL in this example)
image = Image.open(io.BytesIO(img_data))


In [None]:
import soundfile as sf

# Add a path to a file. In this example, we assume a local path, but you can add HDFS paths as well

#/tmp/spark-3543400b-ce35-4e08-a562-93597cfab306/userFiles-a0f4eb7b-de8e-441a-bc44-9d24e90eb12c/srk2.mp3
spark.sparkContext.addFile("/content/srk2.mp3")

# Read the audio file
with open(SparkFiles.get("srk2.mp3"), "rb") as f:
    audio_data = f.read()

# Convert to an audio format you can work with (using soundfile in this example)
audio, samplerate = sf.read(io.BytesIO(audio_data))


In [None]:
# Import necessary modules
from pyspark.sql import SparkSession
from PIL import Image
import io

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Parallel Image Processing") \
    .getOrCreate()

# List of image file paths (adjust these paths to your actual file locations)
image_paths = ["/content/image_cutout2.png", "/content/image_cutout2.png"]

# Create an RDD from the list of image paths
image_paths_rdd = spark.sparkContext.parallelize(image_paths)

# Function to read and process image
def read_and_process_image(file_path):
    with open(file_path, "rb") as f:
        img_data = f.read()
    image = Image.open(io.BytesIO(img_data))

    # Convert to 'RGB' if the image is 'RGBA'
    if image.mode == 'RGBA':
        image = image.convert('RGB')

    # Perform some operation on the image (e.g., resizing)
    image_resized = image.resize((100, 100))

    # Convert image to byte array (if needed for further processing)
    img_byte_arr = io.BytesIO()
    image_resized.save(img_byte_arr, format='JPEG')
    img_byte_arr = img_byte_arr.getvalue()

    return img_byte_arr

# Use map to read and process images in parallel
processed_images_rdd = image_paths_rdd.map(read_and_process_image)

# Collect results (use with caution; not recommended on large datasets)
processed_images = processed_images_rdd.collect()

# At this point, processed_images is a list of byte arrays representing the processed images


In [None]:
from pyspark.sql import SparkSession
from PIL import Image
import io
import os

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Parallel Image Processing") \
    .getOrCreate()

# List of image file paths (adjust these paths to your actual file locations)
image_paths = ["/content/image_cutout2.png", "/content/image_cutout2.png"]

# Create an RDD from the list of image paths
image_paths_rdd = spark.sparkContext.parallelize(image_paths)

# Function to read and process image
def read_and_process_image(file_path):
    with open(file_path, "rb") as f:
        img_data = f.read()
    image = Image.open(io.BytesIO(img_data))

    # Convert to 'RGB' if the image is 'RGBA'
    if image.mode == 'RGBA':
        image = image.convert('RGB')

    # Perform some operation on the image (e.g., resizing)
    image_resized = image.resize((100, 100))

    # Convert image to byte array (if needed for further processing)
    img_byte_arr = io.BytesIO()
    image_resized.save(img_byte_arr, format='JPEG')
    img_byte_arr = img_byte_arr.getvalue()

    # Logic to save the image to disk (replace with your own logic if needed)
    output_path = "./processed_" + os.path.basename(file_path)
    with open(output_path, 'wb') as f_out:
        f_out.write(img_byte_arr)

# Use foreach to process and save images in parallel
image_paths_rdd.foreach(read_and_process_image)


In [None]:
from pyspark.sql import SparkSession
import soundfile as sf
import io

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Parallel Audio Processing using Collect") \
    .getOrCreate()

# List of audio file paths (adjust these paths to your actual file locations)
audio_paths = ["/content/srk2.mp3", "/content/srk2.mp3"]

# Create an RDD from the list of audio paths
audio_paths_rdd = spark.sparkContext.parallelize(audio_paths)

# Function to read and process audio
def read_and_process_audio(file_path):
    audio, samplerate = sf.read(file_path)
    # Perform some operation on the audio data (e.g., taking the first 10 samples)
    audio_processed = audio[:10]
    return audio_processed

# Use map to read and process audio in parallel
processed_audio_rdd = audio_paths_rdd.map(read_and_process_audio)

# Collect results (use with caution; not recommended on large datasets)
processed_audio = processed_audio_rdd.collect()

# At this point, processed_audio is a list of processed audio data arrays


In [None]:
from pyspark.sql import SparkSession
import soundfile as sf
import io
import os

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Parallel Audio Processing using Foreach") \
    .getOrCreate()

# List of audio file paths (adjust these paths to your actual file locations)
audio_paths = ["/content/srk2.mp3", "/content/srk2.mp3"]

# Create an RDD from the list of audio paths
audio_paths_rdd = spark.sparkContext.parallelize(audio_paths)

# Function to read and process audio
def read_and_process_audio(file_path):
    audio, samplerate = sf.read(file_path)
    # Perform some operation on the audio data (e.g., taking the first 10 samples)
    audio_processed = audio[:10]

    # Logic to save the processed audio to disk (replace with your own logic if needed)
    output_path = "./processed_" + os.path.basename(file_path)
    sf.write(output_path, audio_processed, samplerate)

# Use foreach to process and save audio in parallel
audio_paths_rdd.foreach(read_and_process_audio)


In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark SQL Example") \
    .getOrCreate()

# Create a DataFrame
data = [("Alice", 1), ("Bob", 2), ("Catherine", 3)]
columns = ["Name", "ID"]

df = spark.createDataFrame(data, columns)

# Create a Spark SQL table from DataFrame
df.createOrReplaceTempView("people")

# Run SQL queries
result = spark.sql("SELECT * FROM people WHERE ID > 1")

# Create DataFrames
df1 = spark.createDataFrame([(1, "John"), (2, "Mike"), (3, "Sara")], ["ID", "Name"])
df2 = spark.createDataFrame([(1, "Math"), (2, "History"), (3, "Physics")], ["ID", "Subject"])

# Create Spark SQL tables
df1.createOrReplaceTempView("students")
df2.createOrReplaceTempView("subjects")

# SQL Query: Join students with subjects on ID
result_sql = spark.sql("SELECT students.Name, subjects.Subject FROM students JOIN subjects ON students.ID = subjects.ID")
result_sql.show()

# DataFrame API: Equivalent join operation
result_df = df1.join(df2, "ID").select("Name", "Subject")
result_df.show()


+----+-------+
|Name|Subject|
+----+-------+
|John|   Math|
|Mike|History|
|Sara|Physics|
+----+-------+

+----+-------+
|Name|Subject|
+----+-------+
|John|   Math|
|Mike|History|
|Sara|Physics|
+----+-------+



In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark SQL and DataFrame API Examples") \
    .getOrCreate()

# Create DataFrames
df1 = spark.createDataFrame([(1, "John", 28), (2, "Mike", 22), (3, "Sara", 25)], ["ID", "Name", "Age"])
df2 = spark.createDataFrame([(1, "Math"), (2, "History"), (3, "Physics")], ["ID", "Subject"])

# Create Spark SQL tables
df1.createOrReplaceTempView("students")
df2.createOrReplaceTempView("subjects")

# ----------------------
# Spark SQL Operations
# ----------------------

# a. Select All Records
result1 = spark.sql("SELECT * FROM students")
result1.show()

# b. Where Clause
result2 = spark.sql("SELECT * FROM students WHERE Age > 25")
result2.show()

# c. Join Tables
result3 = spark.sql("SELECT students.Name, subjects.Subject FROM students JOIN subjects ON students.ID = subjects.ID")
result3.show()

# ---------------------------
# DataFrame API Operations
# ---------------------------

# a. Select Columns
df1.select("Name").show()

# b. Filter Records
df1.filter(df1.Age > 25).show()

# c. Join DataFrames
joined_df = df1.join(df2, "ID").select("Name", "Subject")
joined_df.show()


+---+----+---+
| ID|Name|Age|
+---+----+---+
|  1|John| 28|
|  2|Mike| 22|
|  3|Sara| 25|
+---+----+---+

+---+----+---+
| ID|Name|Age|
+---+----+---+
|  1|John| 28|
+---+----+---+

+----+-------+
|Name|Subject|
+----+-------+
|John|   Math|
|Mike|History|
|Sara|Physics|
+----+-------+

+----+
|Name|
+----+
|John|
|Mike|
|Sara|
+----+

+---+----+---+
| ID|Name|Age|
+---+----+---+
|  1|John| 28|
+---+----+---+

+----+-------+
|Name|Subject|
+----+-------+
|John|   Math|
|Mike|History|
|Sara|Physics|
+----+-------+



In [None]:
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Spark SQL and DataFrame API Parallel Execution Example") \
    .getOrCreate()

# Create DataFrames
df1 = spark.createDataFrame([(1, "John", 28), (2, "Mike", 22), (3, "Sara", 25)], ["ID", "Name", "Age"])
df2 = spark.createDataFrame([(1, "Math"), (2, "History"), (3, "Physics")], ["ID", "Subject"])

# Create Spark SQL tables
df1.createOrReplaceTempView("students")
df2.createOrReplaceTempView("subjects")

# -------------- Parallel Execution Example --------------
# These operations are inherently parallelized by Spark

# Spark SQL Join Query
result_sql = spark.sql("SELECT students.Name, subjects.Subject FROM students JOIN subjects ON students.ID = subjects.ID")

# DataFrame API Join Operation
result_df = df1.join(df2, "ID").select("Name", "Subject")

# Both operations are automatically parallelized by Spark
# --------------------------------------------------------

# Custom partitioning: Repartition df1 into 50 partitions
df1_repartitioned = df1.repartition(50)

# Note: The 'show()' command is used for demonstration.
# In a real-world application, you would typically perform further transformations or actions based on your needs.
result_sql.show()
result_df.show()

# Output: The joined DataFrames or tables, executed in parallel


In [None]:
from pyspark.sql import SparkSession

# Initialize a Spark Session
spark = SparkSession.builder \
    .appName("Real-life Example of PySpark") \
    .getOrCreate()

# Create Employee Data DataFrame
employee_data = spark.createDataFrame([
    (1, "Alice", "HR"),
    (2, "Bob", "Engineering"),
    (3, "Catherine", "Finance"),
], ["ID", "Name", "Department"])

# Create Salary Data DataFrame
salary_data = spark.createDataFrame([
    (1, 50000),
    (2, 120000),
    (3, 90000),
], ["ID", "Annual_Salary"])

# Repartition employee_data for optimized performance
employee_data_repartitioned = employee_data.repartition(50)

# Join Employee Data with Salary Data on ID
joined_data = employee_data_repartitioned.join(salary_data, "ID")

# Filter out employees with an Annual Salary less than 60000
filtered_data = joined_data.filter(joined_data.Annual_Salary >= 60000)

# Show the final DataFrame
filtered_data.show()

# Note: In a real-world scenario, you may also save this filtered data to a database or another data storage system.


+---+---------+-----------+-------------+
| ID|     Name| Department|Annual_Salary|
+---+---------+-----------+-------------+
|  2|      Bob|Engineering|       120000|
|  3|Catherine|    Finance|        90000|
+---+---------+-----------+-------------+



In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Initialize existing or new Spark Context
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(appName="WordCountApp")
sc.setLogLevel("ERROR")

# Initialize Streaming Context with a batch interval of 1 second
ssc = StreamingContext(sc, 1)


# Create a DStream that connects to a TCP source on hostname:port
lines = ssc.socketTextStream("localhost", 9999)

print(lines)

# Split lines into words and count each word in each batch
words = lines.flatMap(lambda line: line.split(" "))
wordCounts = words.countByValue()

# Print the count of each word
wordCounts.pprint()

# Start the computation
ssc.start()
ssc.awaitTermination()


In [None]:
! nc -lk 9999 #To send message in the console for streaming

In [None]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

# Initialize existing or new Spark Context
sc = SparkContext.getOrCreate()
if sc is None:
    sc = SparkContext(appName="RealTimeWordCount")
sc.setLogLevel("ERROR")

# Initialize Streaming Context with a batch interval of 10 seconds
ssc = StreamingContext(sc, 10)

# Create a DStream that connects to a TCP source on hostname:port
lines = ssc.socketTextStream("localhost", 9999)

# Split lines into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch within a window of 30 seconds, sliding every 10 seconds
windowedWordCounts = words.countByValueAndWindow(windowDuration=30, slideDuration=10)

# Print the count of each word
windowedWordCounts.pprint()

# Start the computation
ssc.start()
ssc.awaitTermination()


In [None]:
from pyspark.ml.feature import Imputer
df_pyspark=spark.read.csv('test.csv',header=True,inferSchema=True)
imputer = Imputer(
    inputCols=['age', 'Experience', 'Salary'],
    outputCols=["{}_imputed".format(c) for c in ['age', 'Experience', 'Salary']]
    ).setStrategy("median")
# Add imputation cols to df
imputer.fit(df_pyspark).transform(df_pyspark).show()

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("LogisticRegressionPipeline").getOrCreate()

# Create DataFrame with training data
training_data = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.1, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))
], ["label", "features"])

# Initialize Standard Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Initialize Logistic Regression Model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

# Build the pipeline
pipeline = Pipeline(stages=[scaler, lr])

# Initialize Grid for Hyperparameter Tuning
param_grid = ParamGridBuilder() \
    .addGrid(lr.maxIter, [10, 50, 100]) \
    .addGrid(lr.regParam, [0.01, 0.05, 0.1]) \
    .build()

# Initialize CrossValidator
cross_val = CrossValidator(
    estimator=pipeline,
    estimatorParamMaps=param_grid,
    evaluator=BinaryClassificationEvaluator(),
    numFolds=3
)

# Fit the model
cv_model = cross_val.fit(training_data)

# Get the best model
best_model = cv_model.bestModel

# Extract the stages of the best model
scaler_stage = best_model.stages[0]
lr_stage = best_model.stages[1]

# Show hyperparameters of the best model
print(f"Best MaxIter: {lr_stage._java_obj.getMaxIter()}")
print(f"Best RegParam: {lr_stage._java_obj.getRegParam()}")


In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors
from pyspark.sql import SparkSession

# Initialize Spark Session
spark = SparkSession.builder.appName("SimplifiedLogisticRegressionPipeline").getOrCreate()

# Create DataFrame with training data
training_data = spark.createDataFrame([
    (1.0, Vectors.dense([0.0, 1.1, 0.1])),
    (0.0, Vectors.dense([2.0, 1.1, -1.0])),
    (0.0, Vectors.dense([2.0, 1.3, 1.0])),
    (1.0, Vectors.dense([0.0, 1.2, -0.5]))
], ["label", "features"])

# Initialize Standard Scaler
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")

# Initialize Logistic Regression Model
lr = LogisticRegression(featuresCol="scaled_features", labelCol="label")

# Build the pipeline
pipeline = Pipeline(stages=[scaler, lr])

# Fit the model
model = pipeline.fit(training_data)

# Extract Logistic Regression from the fitted model
lr_model = model.stages[-1]

# Show hyperparameters of the model
print(f"MaxIter: {lr_model._java_obj.getMaxIter()}")
print(f"RegParam: {lr_model._java_obj.getRegParam()}")


-------------------------------------------
Time: 2023-09-06 09:24:10
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:11
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:12
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:13
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:14
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:15
-------------------------------------------

-------------------------------------------
Time: 2023-09-06 09:24:16
-------------------------------------------

MaxIter: 100
RegParam: 0.0


In [None]:
!pip install graphframes
from pyspark.sql import SparkSession
from graphframes import GraphFrame

# Initialize Spark Session
spark = SparkSession.builder.appName("GraphFramesExample").getOrCreate()

# Create a DataFrame for vertices
v = spark.createDataFrame([
  ("1", "Alice"),
  ("2", "Bob"),
  ("3", "Charlie"),
  ("4", "David")
], ["id", "name"])

# Create a DataFrame for edges
e = spark.createDataFrame([
  ("1", "2", "friend"),
  ("2", "3", "follows"),
  ("3", "4", "friend"),
  ("4", "1", "follows")
], ["src", "dst", "relationship"])

# Create a GraphFrame
g = GraphFrame(v, e)

# Run PageRank algorithm, and show results.
results = g.pageRank(resetProbability=0.01, maxIter=20)
results.vertices.select("id", "pagerank").show()


In [None]:
# Stop the existing SparkContext
sc.stop()

# Initialize a new SparkContext with specific configurations
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("PerformanceTuningExample").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)

# Create an RDD
rdd = sc.parallelize(range(1, 100000))

# Cache the RDD
rdd.persist()

# Perform some actions
result1 = rdd.count()
result2 = rdd.sum()

print(result1,result2)

# Stop the SparkContext
sc.stop()


99999 4999950000


In [None]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# Stop any existing SparkContext
sc = SparkContext.getOrCreate()
sc.stop()

# Initialize a new SparkContext with specific configurations
conf = SparkConf().setAppName("PerformanceTuningExample").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)

# Initialize Spark Session with custom shuffle partitions and executor memory
spark = SparkSession.builder \
    .appName("PerformanceTuningExample") \
    .config("spark.sql.shuffle.partitions", "50") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Create an RDD
rdd = sc.parallelize([(1, 1), (2, 1), (3, 1), (1, 1), (2, 1), (3, 1)])

# Using groupByKey (Not recommended due to more shuffling)
grouped_result = rdd.groupByKey().mapValues(sum).collect()
print("Result with groupByKey:", grouped_result)

# Using reduceByKey (Recommended)
reduced_result = rdd.reduceByKey(lambda a, b: a + b).collect()
print("Result with reduceByKey:", reduced_result)

# Stop the SparkContext and SparkSession to free up resources
sc.stop()
spark.stop()


Result with groupByKey: [(2, 2), (1, 2), (3, 2)]
Result with reduceByKey: [(2, 2), (1, 2), (3, 2)]


In [None]:
!spark-submit \
  --master yarn \
  --deploy-mode cluster \
  --name MySparkApp \
  --executor-memory 2G \
  --num-executors 3 \
  my_spark_app.py
#To deploy on the standalone cluster, a cloud environment, or a Hadoop YARN cluster.

In [None]:
''' Dockerfile
# Use a base image with Java
FROM openjdk:8-jdk-alpine

# Set environment variables for Spark
ENV SPARK_VERSION=3.1.2
ENV HADOOP_VERSION=3.2

# Install Spark
RUN apk add --no-cache wget && \
    wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
    tar -xzf spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz && \
    mv spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION} /spark && \
    rm spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz

# Add Spark binaries to PATH
ENV PATH $PATH:/spark/bin
'''
#To install Spark using Docker

In [None]:
!spark-submit \
  --master k8s://<KUBERNETES_CLUSTER_ENDPOINT> \
  --deploy-mode cluster \
  --name my-spark-job \
  --class org.apache.spark.examples.SparkPi \
  --conf spark.executor.instances=3 \
  --conf spark.kubernetes.container.image=my-spark-image:latest \
  local:///path/to/spark/examples/jars/spark-examples_2.12-3.1.2.jar
