<a href="https://colab.research.google.com/github/mariumfaheem/Data-Science/blob/main/Spark_Treasure.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Spark Treasure**

In [None]:
pip install pyspark

#Spark Sql

In [None]:
import pandas as pd
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession


# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession \
    .builder \
    .appName("Python Spark DataFrames basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()
    

In [None]:
spark

In [None]:
# Read the file using `read_csv` function in pandas
mtcars = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/mtcars.csv')


In [None]:
mtcars.rename( columns={'Unnamed: 0':'name'}, inplace=True )

In [None]:
sdf = spark.createDataFrame(mtcars) 

In [None]:
sdf.printSchema()

In [None]:
sdf.createTempView("cars")

In [None]:
#Showing the whole table
spark.sql("SELECT * FROM cars").show()


In [None]:
# Basic filtering query to determine cars that have a high mileage and low cylinder count
spark.sql("SELECT * FROM cars WHERE mpg > 20 AND  cyl <6 ").show(5)

In [None]:
# Aggregating data and grouping by cylinders
spark.sql("SELECT count(*), cyl from cars GROUP BY cyl").show()

Create a Pandas UDF to apply a columnar operation
Apache Spark has become the de-facto standard in processing big data. To enable data scientists to leverage the value of big data, Spark added a Python API in version 0.7, with support for user-defined functions (UDF). These user-defined functions operate one-row-at-a-time, and thus suffer from high serialization and invocation overhead. As a result, many data pipelines define UDFs in Java and Scala and then invoke them from Python.

Pandas UDFs built on top of Apache Arrow bring you the best of both worlds—the ability to define low-overhead, high-performance UDFs entirely in Python. In this simple example, we will build a Scalar Pandas UDF to convert the wT column from imperial units (1000-lbs) to metric units (metric tons).

In addition, UDFs can be registered and invoked in SQL out of the box by registering a regular python function using the @pandas_udf() decorator. We can then apply this UDF to our wt column.

In [None]:
# import the Pandas UDF function 
from pyspark.sql.functions import pandas_udf, PandasUDFType


In [None]:
@pandas_udf("float")
def convert_wt(s: pd.Series) -> pd.Series:
    # The formula for converting from imperial to metric tons
    return s * 0.45

spark.udf.register("convert_weight", convert_wt)



In [None]:
spark.sql("SELECT *, wt AS weight_imperial, convert_weight(wt) as weight_metric FROM cars").show()

#Spark DataFrames

In [None]:
# Preview a few records
mtcars.head()

In [None]:
# We use the `createDataFrame` function to load the data into a spark dataframe
sdf = spark.createDataFrame(mtcars) 


In [None]:
# Let us look at the schema of the loaded spark dataframe
sdf.printSchema()

In [None]:
sdf.select('mpg').show(10)

 Filtering and Columnar operations
Filtering and Column operations are important to select relevant data and apply useful transformations.

We first filter to only retain rows with mpg > 18. We use the filter() function for this.

In [None]:

sdf.filter(sdf['mpg']<18).show(5)

Operating on Columns

Spark also provides a number of functions that can be directly applied to columns for data processing and aggregation. The example below shows the use of basic arithmetic functions to convert the weight values from lb to metric ton. We create a new column called wtTon that has the weight from the wt column converted to metric ton

In [None]:
sdf.withColumn('wtTon', sdf['wt'] * 0.45 ).show(5)

#Grouping and Aggregation
Spark DataFrames support a number of commonly used functions to aggregate data after grouping. In this example we compute the average weight of cars by their cylinders as shown below.

In [None]:
sdf.groupby(['cyl'])\
.agg({"wt": "AVG"})\
.show(5)

In [None]:
car_counts = sdf.groupby(['cyl'])\
.agg({"wt": "count"})\
.sort("count(wt)", ascending=False)\
.show(5)