# PySpark

## PySpark Overview

PySpark is the Python API for Apache Spark. It enables you to perform real-time, large-scale data processing in a distributed environment using Python. It also provides a PySpark shell for interactively analyzing your data.

<img src='./images/PySpark_overview.png' width=50% hight=50%>

## Spark SQL and DataFrames

Spark SQL allows you to seamlessly mix SQL queries with Spark programs.

PySpark DF are implemented on top of RDDs. They are lazily evaluated means it does not immediately compute but plans how to compute later and starts when ‘collect()’ is called.  

- [Quickstart: DataFrame](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html)
- [Live Notebook: DataFrame](https://mybinder.org/v2/gh/apache/spark/bf45fad170?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart_df.ipynb)
- [Spark SQL API Reference](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html)

Why use PySpark

| PySpark | Pandas |
| :------ | :----- |
| Large datasets which need parallel processing | Small datasets that can handled on a single machine. |
| Integrated with other bid data tools          | - |

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

23/04/22 19:46:18 WARN Utils: Your hostname, Krits-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.38 instead (on interface en0)
23/04/22 19:46:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/22 19:46:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

23/04/22 22:58:33 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 285713 ms exceeds timeout 120000 ms
23/04/22 22:58:33 WARN SparkContext: Killing executors is not supported by current scheduler.
23/04/22 22:58:36 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:117)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$driverEndpoint(BlockManagerMasterEndpoint.scala:116)
	at org.apache.spark.storage.B

## Pandas API on Spark

Pandas **API on Spark** allows you to scale your pandas workload to any size by running it distributed across multiple nodes. It lets you migrate your applications without modifying the code.

- [Quickstart: Pandas API on Spark](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html)
- [Live Notebook: pandas API on Spark](https://mybinder.org/v2/gh/apache/spark/bf45fad170?filepath=python%2Fdocs%2Fsource%2Fgetting_started%2Fquickstart_ps.ipynb)
- [Pandas API on Spark Reference](https://spark.apache.org/docs/latest/api/python/reference/pyspark.pandas/index.html)

## Structured Streaming

Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive.

Structured Streaming Programming Guide
Structured Streaming API Reference

- [Structured Streaming Programming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Structured Streaming API Reference](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/index.html)

## Machine Learning (MLlib)

Built on top of Spark, MLlib is a scalable machine learning library that provides a uniform set of high-level APIs that help users create and tune practical machine learning pipelines.

Machine Learning Library (MLlib) Programming Guide
Machine Learning (MLlib) API Reference

## Spark Core and RDDs

Spark Core is the underlying general execution engine for the Spark platform that all other functionality is built on top of. It provides RDDs (Resilient Distributed Datasets) and in-memory computing capabilities.

Note that the RDD API is a low-level API which can be difficult to use and you do not get the benefit of Spark’s automatic query optimization capabilities. We recommend using DataFrames (see Spark SQL and DataFrames above) instead of RDDs as it allows you to express what you want more easily and lets Spark automatically construct the most efficient query for you.

Spark Core API Reference

- [Machine Learning Library (MLlib) Programming Guide](https://spark.apache.org/docs/latest/ml-guide.html)
- [Machine Learning (MLlib) API Reference](https://spark.apache.org/docs/latest/api/python/reference/pyspark.ml.html)