## Spark
- Driver and Worker Node(s)
- Code is composed of `Transformations` and `Actions`
- The lineage of `transformations` are collected in the form of a __directed acyclic graph (DAG)__ which are then executed when an `Action` is called by the driver node. With the help of the DAG, Spark can make key optimizations to reduce the time of execution and improve the performance of the cluster of nodes.

### Performance things to watch
- Reduce I/O between Nodes
- Monitor/manage data in memory

### RDD
- Lowest level API in Spark
- Strengths:
    - Fault Tolerant
        - Utilizes a restricted form of sharded memory between a cluster of nodes
        - Forming a coarse-grained linear of transformations
        - As opposed to relying on fine-grained updates
        
#### When to use RDDs?
- When you are working with operations that `need conversion at a `__`row/record level`__ for special data structures ___(text or media files)___.
- When the _flexibility_ in __modifying data at a granular level__ is more important.
- When `schema` becomes _irrelevant_ to your use case but __parallelization__ can help.
- When you are not going to use the __domain-specific expressions__ (think of __Spark SQL__ abstractions)

#### When to avoid RDDs?
- When you want a schema
- When you want to use the Spark catalyst optimizations along with the column name access.
- When you want to avoid complex coding constructs for simple API operations, e.g. finding the average frequency of words in a file. (Basically when semi-unstructured or unstructured data)



### DataFrames
- It is a higher-level abstraction from RDDs and is powered by a schema that also allows Spark to perform more automated optimizations at runtime using the __Catalyst optimizer__.
- When joining two RDDs converting them into dataframes can be beneficial due to the presence of the __Catalyst optimizer__. It speeds up the execution process using the power of quasi-quotes and pattern matching in Scala.


#### Dataframes can be created using the following ways:
- from RDDs using the `inferSchema` option (or) using a `custom schema`.
- from files that are in different formats (`JSON, Parquet, CSV, Avro` etc.).
- from datasets using the implicit conversion `toDF` method.

Untyped nature of Dataframes - Type Checking at run time

DataFrames are a set of are generic Row objects which hold the data and they do have types. The word untyped references the time at which the type-checking is done with a Dataframe. It is ___done only during `run time`___ based on the schema that was inferred or defined by the user.

Shortcomings of the Dataframe

It was not able to use UDFs efficiently with optimization.
There is a lack of strong typing that can be achieved in Scala/Java.


### DataSets

#### Features of Datasets
- Seamless support for semi-structured data.
- Compile-time type-safety (Syntax + Analysis errors).
- Encoders increased the serialization/deserialization speed.
- Single API for Java & Scala.


**Datasets** were introduced in Spark release 1.6.0 (early 2016). It brought the advantage of **strong type checking** at ___compile time___ itself.

The fundamental concept of bringing in type safety was via the introduction of **Encoders** that can convert a `JVM object` of `type T` into an `internal binary representation`. It is also a serialization & deserialization (__SerDe__) framework. Encoders represent the schema of records which avoids the unnecessary conversions of JVM objects. It enforces a mapping from a domain object to the internal binary representation. They provide super fast conversions compared to __Java or Kryo serialization__.


## Unification of the Dataset and DataFrame APIs

Dataset & Dataframe were separate APIs until eventually two of the musketeers combined to form the Unified Dataset API in the Spark 2.0.0 release (late 2016).

This unification coalesced the advantages of RDDs and Dataframe APIs into one umbrella.

- Dataframe became a type alias of `Dataset[Row]`. 
- In terms of languages, the Dataframe remained to be the primary abstraction in Python & R languages as they are analogous to the `single-node dataframe`.
- In Scala & Java, **datasets** represented the `typed version` of the API and the **dataframe** is the `untyped version`.


#### When to use Unified Dataset [Dataframe / Dataset] API?
- When we are planning to use high-level abstractions on domain specific abstractions (aggregations, joins etc.) with schema enforcement.
- Columnar access, lambda functions on semi-structured data.
- For a higher degree of type-safety check at compile-time, we can use the typed version of the unified dataset API.
- To benefit from the tungsten code generation (faster expression evaluation using dataframe/SQL operators).
- R users are recommended to use Dataframes (Dataset is not available).
- Python users can also use Dataframes (Dataset is not available).

## Pros and Cons

#### Dataset:

- pros: has optimized operations over column oriented storages
- pros: also many operations doesn't need deserialization
- pros: provide table/sql semantic if you like it (I don't ;)
- pros: dataset operations comes with an optimization engine "catalyst" that improves the performance of your code (I'm not sure if it is really that great. If you know what you code, meaning what is done to the data, your code should be optimized by itself)
- cons: most operation loose typing
- cons: dataset operations can become too complicated for complex algorithm that doesn't suit it. The 2 main limits I know are managing invalid data and complex math algorithm.


#### Dataframe:

- pros: required between dataset operations that lose type
- cons: just use Dataset it has all the advantages and more


#### RDD:
- pros: (really) strongly typed
- pros: scala/java semantic. You can design your code pretty much how you would for an standard app that process in-memory collections. Well, with functional semantic :)
- cons: full jvm deserialization is required to process data, at any step mentioned before: after reading input, and between all processing steps that requires data to be moved between worker, or stored locally to manage memory bound.


#### Conclusion

Just use **Dataset** by default:

- read input with an Encoder, if the data format allows it it will validate input schema at start
- use dataset operations and when you loose type, go back to a typed dataset.
- Typically, use typed dataset as input and output of all methods.

There are cases where what you want to code would be too complex to express using dataset operations. Most app doesn't, but it often happen in my work where I implements complex mathematical models. In this case:
- start with dataset
- filter and shuffle (groupBy, join) data as much as possible with dataset op
- once you have only the required data, and need not move them, convert to rdd and apply you complex computing.



In [1]:
import findspark

findspark.init()

In [2]:
from pyspark import SparkContext

sc = SparkContext.getOrCreate()
sc

22/06/06 09:36:36 WARN Utils: Your hostname, Pauls-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.4.90 instead (on interface en0)
22/06/06 09:36:36 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/06 09:36:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_assert_on_driver',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_encryption_enabled',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_repr_html_',
 '_serialize_to_jvm',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'bina

In [5]:
type(sc)

pyspark.context.SparkContext

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [7]:
type(spark)

pyspark.sql.session.SparkSession

In [8]:
dir(spark)

['Builder',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_activeSession',
 '_convert_from_pandas',
 '_createFromLocal',
 '_createFromRDD',
 '_create_dataframe',
 '_create_from_pandas_with_arrow',
 '_create_shell_session',
 '_get_numpy_record_dtype',
 '_inferSchema',
 '_inferSchemaFromList',
 '_instantiatedSession',
 '_jsc',
 '_jsparkSession',
 '_jvm',
 '_jwrapped',
 '_repr_html_',
 '_sc',
 '_wrapped',
 'builder',
 'catalog',
 'conf',
 'createDataFrame',
 'getActiveSession',
 'newSession',
 'range',
 'read',
 'readStream',
 'sparkContext',
 'sql',
 'stop',
 'streams',
 'table',
 'udf',
 'version']

In [9]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

In [10]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [11]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  4|5.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [16]:
# or create a dataframe using the `schema` parameter

df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [17]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [18]:
# create a PySpark Dataframe from a Pandas DataFrame

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [19]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [20]:
# create a PySpark Dataframe from a RDD which is composed of a list of tuples

rdd = spark.sparkContext.parallelize([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
])
df = spark.createDataFrame(rdd, schema=['a', 'b', 'c', 'd', 'e'])
df


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [21]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [22]:
# All DataFrames above result same.
df.show()
df.printSchema()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [23]:
df.show(vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
-RECORD 1------------------
 a   | 2                   
 b   | 3.0                 
 c   | string2             
 d   | 2000-02-01          
 e   | 2000-01-02 12:00:00 
-RECORD 2------------------
 a   | 3                   
 b   | 4.0                 
 c   | string3             
 d   | 2000-03-01          
 e   | 2000-01-03 12:00:00 



In [24]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [26]:
df.describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



**`DataFrame.collect()`** collects the distributed data to the `driver side` as the local data in Python. Note that this can throw an **out-of-memory error** when the dataset is too large to fit in the `driver side` because it collects all the data from executors to the driver side.

In [27]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

**`PySpark DataFrame`** also provides the conversion back to a pandas DataFrame to leverage pandas API. Note that `toPandas` ___also collects all data___ into the **driver side** that can easily cause an `out-of-memory-error` when the data is too large to fit into the driver side.

In [28]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [29]:
# PySpark DataFrame is lazily evaluated and simply selecting a column does not trigger the computation but it returns a Column instance.

from pyspark.sql import Column
from pyspark.sql.functions import upper

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [31]:
# These Columns can be used to select the columns from a DataFrame. 
# For example, DataFrame.select() takes the Column instances that returns another DataFrame.

df.select(df.e).show()

+-------------------+
|                  e|
+-------------------+
|2000-01-01 12:00:00|
|2000-01-02 12:00:00|
|2000-01-03 12:00:00|
+-------------------+



In [32]:
# to add a new column of data
df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [33]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



To select a subset of rows, use **`DataFrame.filter()`**.

In [35]:
df.filter(df.b > 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [36]:
# applying a function
import pandas
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [37]:
@pandas_udf('string') # note this was a string, and the function got a series of numbers
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1

df.select(pandas_plus_one(df.a)).show()

22/06/06 11:51:12 ERROR Executor: Exception in task 2.0 in stage 59.0 (TID 210)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "pyarrow/array.pxi", line 915, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 312, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:555)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:101)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:50)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:508)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "pyarrow/array.pxi", line 915, in pyarrow.lib.Array.from_pandas
  File "pyarrow/array.pxi", line 312, in pyarrow.lib.array
  File "pyarrow/array.pxi", line 83, in pyarrow.lib._ndarray_to_array
  File "pyarrow/error.pxi", line 122, in pyarrow.lib.check_status
pyarrow.lib.ArrowTypeError: Expected a string or bytes dtype, got int64


In [40]:
df.schema

StructType(List(StructField(a,LongType,true),StructField(b,DoubleType,true),StructField(c,StringType,true),StructField(d,DateType,true),StructField(e,TimestampType,true)))

In [41]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)



In [42]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [44]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1


# df.b is a double, but works with a long
df.select(pandas_plus_one(df.b)).show()

+------------------+
|pandas_plus_one(b)|
+------------------+
|                 3|
|                 4|
|                 5|
+------------------+



In [45]:
@pandas_udf('double')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1


# df.a is a long, but works with a double
df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|               2.0|
|               3.0|
|               4.0|
+------------------+



## Grouping Data

In [46]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [48]:
df.groupby("color").sum().show()

+-----+-------+-------+
|color|sum(v1)|sum(v2)|
+-----+-------+-------+
|  red|     24|    240|
| blue|      6|     60|
|black|      6|     60|
+-----+-------+-------+



In [51]:
df.groupby("color").agg({"color": 'count', 'v1': "sum", 'v2': 'mean'}).show() # can't alias column names tho

+-----+------------+-------+-------+
|color|count(color)|sum(v1)|avg(v2)|
+-----+------------+-------+-------+
|  red|           5|     24|   48.0|
| blue|           2|      6|   30.0|
|black|           1|      6|   60.0|
+-----+------------+-------+-------+



In [70]:
from pyspark.sql.functions import mean, sum, stddev, count, column, round

In [61]:
df.groupby("color").agg(count('color').alias('Count_of_color'),
                       mean("v1").alias("v1_avg"),
                       stddev('v1').alias("stddev_v1")).show()

+-----+--------------+------+------------------+
|color|Count_of_color|v1_avg|         stddev_v1|
+-----+--------------+------+------------------+
|  red|             5|   4.8|2.8635642126552705|
| blue|             2|   3.0|1.4142135623730951|
|black|             1|   6.0|              null|
+-----+--------------+------+------------------+



In [71]:
df.groupby(["color", "v1", "v2"]).agg(count('color').alias('Count_of_color'),
                       mean("v1").alias("v1_avg"),
                       stddev('v1').alias("stddev_v1"),
                       (column('v1') * column('v2')).alias('product_v1_v2')).show()

+-----+---+---+--------------+------+---------+-------------+
|color| v1| v2|Count_of_color|v1_avg|stddev_v1|product_v1_v2|
+-----+---+---+--------------+------+---------+-------------+
|  red|  1| 10|             1|   1.0|     null|           10|
| blue|  2| 20|             1|   2.0|     null|           40|
|  red|  3| 30|             1|   3.0|     null|           90|
| blue|  4| 40|             1|   4.0|     null|          160|
|  red|  5| 50|             1|   5.0|     null|          250|
|black|  6| 60|             1|   6.0|     null|          360|
|  red|  7| 70|             1|   7.0|     null|          490|
|  red|  8| 80|             1|   8.0|     null|          640|
+-----+---+---+--------------+------+---------+-------------+



In [73]:
df.groupby(["color"]).agg(count('color').alias('Count_of_color'),
                       mean("v1").alias("v1_avg"),
                       round(stddev('v1'),3).alias("stddev_v1"),).show()

+-----+--------------+------+---------+
|color|Count_of_color|v1_avg|stddev_v1|
+-----+--------------+------+---------+
|  red|             5|   4.8|    2.864|
| blue|             2|   3.0|    1.414|
|black|             1|   6.0|     null|
+-----+--------------+------+---------+



In [50]:
df.agg({"color": 'count', 'v1': "sum", 'v2': 'mean'}).show()

+------------+-------+-------+
|count(color)|sum(v1)|avg(v2)|
+------------+-------+-------+
|           8|     36|   45.0|
+------------+-------+-------+



In [74]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [82]:
df1 = spark.createDataFrame(data=
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    schema=('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def asof_join(l, r):
    return pd.merge_asof(l, r, on='time', by='id')

print("df1")
df1.show()
print("df2")
df2.show()
print("df1 left join df2 nearest match")
(df1.groupby('id').cogroup(df2.groupby('id'))
            .applyInPandas(asof_join, 
                           schema='time int, id int, v1 double, v2 string').show())

df1
+--------+---+---+
|    time| id| v1|
+--------+---+---+
|20000101|  1|1.0|
|20000101|  2|2.0|
|20000102|  1|3.0|
|20000102|  2|4.0|
+--------+---+---+

df2
+--------+---+---+
|    time| id| v2|
+--------+---+---+
|20000101|  1|  x|
|20000101|  2|  y|
+--------+---+---+

df1 left join df2 nearest match
+--------+---+---+---+
|    time| id| v1| v2|
+--------+---+---+---+
|20000101|  1|1.0|  x|
|20000102|  1|3.0|  x|
|20000101|  2|2.0|  y|
|20000102|  2|4.0|  y|
+--------+---+---+---+



In [78]:
help(pd.merge_asof)

Help on function merge_asof in module pandas.core.reshape.merge:

merge_asof(left: 'DataFrame | Series', right: 'DataFrame | Series', on: 'IndexLabel | None' = None, left_on: 'IndexLabel | None' = None, right_on: 'IndexLabel | None' = None, left_index: 'bool' = False, right_index: 'bool' = False, by=None, left_by=None, right_by=None, suffixes: 'Suffixes' = ('_x', '_y'), tolerance=None, allow_exact_matches: 'bool' = True, direction: 'str' = 'backward') -> 'DataFrame'
    Perform a merge by key distance.
    
    This is similar to a left-join except that we match on nearest
    key rather than equal keys. Both DataFrames must be sorted by the key.
    
    For each row in the left DataFrame:
    
      - A "backward" search selects the last row in the right DataFrame whose
        'on' key is less than or equal to the left's key.
    
      - A "forward" search selects the first row in the right DataFrame whose
        'on' key is greater than or equal to the left's key.
    
      - A 

## Write/Read Data

In [83]:
df.write.csv('getting_started.csv', header=True)
spark.read.csv('getting_started.csv', header=True).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  6| 60|
| blue|banana|  2| 20|
|  red|banana|  1| 10|
|  red|carrot|  3| 30|
|  red|banana|  7| 70|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|  red| grape|  8| 80|
+-----+------+---+---+

