In [0]:
!pip install pyspark



#RDD

In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()

In [0]:
help(sc.parallelize)

Help on method parallelize in module pyspark.context:

parallelize(c: Iterable[~T], numSlices: Union[int, NoneType] = None) -> pyspark.rdd.RDD[~T] method of pyspark.context.SparkContext instance
    Distribute a local Python collection to form an RDD. Using range
    is recommended if the input represents a range for performance.
    
    Examples
    --------
    >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect()
    [[0], [2], [3], [4], [6]]
    >>> sc.parallelize(range(0, 6, 2), 5).glom().collect()
    [[], [0], [], [2], [4]]



In [0]:
list1 =sc.parallelize (
   ["scala",
   "java",
   "hadoop",
   "spark",
    "spark",
   "akka",
   "spark vs hadoop",
   "pyspark",
   "pyspark and spark"]
)

In [0]:
type(list1)

In [0]:
list1.count()

9

In [0]:
list1.collect()

['scala',
 'java',
 'hadoop',
 'spark',
 'spark',
 'akka',
 'spark vs hadoop',
 'pyspark',
 'pyspark and spark']

In [0]:
words_filter = list1.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()

In [0]:
words_filter

PythonRDD[2] at collect at <ipython-input-7-3556160135>:2

In [0]:
print(filtered)

['spark', 'spark', 'spark vs hadoop', 'pyspark', 'pyspark and spark']


In [0]:
list1.map(lambda x: x.upper()).collect()

['SCALA',
 'JAVA',
 'HADOOP',
 'SPARK',
 'SPARK',
 'AKKA',
 'SPARK VS HADOOP',
 'PYSPARK',
 'PYSPARK AND SPARK']

In [0]:
words_map = list1.map(lambda x: (x, 1))
mapping = words_map.collect()

In [0]:
print(mapping)

[('scala', 1), ('java', 1), ('hadoop', 1), ('spark', 1), ('spark', 1), ('akka', 1), ('spark vs hadoop', 1), ('pyspark', 1), ('pyspark and spark', 1)]


In [0]:
words_map.countByKey()

defaultdict(int,
            {'scala': 1,
             'java': 1,
             'hadoop': 1,
             'spark': 2,
             'akka': 1,
             'spark vs hadoop': 1,
             'pyspark': 1,
             'pyspark and spark': 1})

#Dataframes

In [0]:
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

In [0]:
spark

In [0]:
help(spark.createDataFrame)

Help on method createDataFrame in module pyspark.sql.session:

createDataFrame(data: Union[pyspark.rdd.RDD[Any], Iterable[Any], ForwardRef('PandasDataFrameLike')], schema: Union[pyspark.sql.types.AtomicType, pyspark.sql.types.StructType, str, NoneType] = None, samplingRatio: Union[float, NoneType] = None, verifySchema: bool = True) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Creates a :class:`DataFrame` from an :class:`RDD`, a list or a :class:`pandas.DataFrame`.
    
    When ``schema`` is a list of column names, the type of each column
    will be inferred from ``data``.
    
    When ``schema`` is ``None``, it will try to infer the schema (column names and types)
    from ``data``, which should be an RDD of either :class:`Row`,
    :class:`namedtuple`, or :class:`dict`.
    
    When ``schema`` is :class:`pyspark.sql.types.DataType` or a datatype string, it must match
    the real data, or an exception will be thrown at runtime. If the 

In [0]:
ages_list = [21, 23, 18, 41, 32]

In [0]:
type(ages_list)

list

In [0]:
type(ages_list[2])

int

In [0]:
spark.createDataFrame(ages_list)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-5404265235422420>, line 1[0m
[0;32m----> 1[0m spark[38;5;241m.[39mcreateDataFrame(ages_list)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     50[0m     )
[1;32m     51[0m     [38;5;28;01mreturn[39;00m res

File [0;32m/databricks/spark/python/pysp

In [0]:
spark.createDataFrame(ages_list,'int')

DataFrame[value: int]

In [0]:
from pyspark.sql.types import IntegerType

In [0]:
spark.createDataFrame(ages_list, IntegerType()).show()

+-----+
|value|
+-----+
|   21|
|   23|
|   18|
|   41|
|   32|
+-----+



###Multi Column Spark Dataframe using Python list

In [0]:
ages_list = [(21, ), (23, ), (41, ), (32, )]

In [0]:
type(ages_list)

list

In [0]:
type(ages_list[2])

tuple

In [0]:
spark.createDataFrame(ages_list).show()

+---+
| _1|
+---+
| 21|
| 23|
| 41|
| 32|
+---+



In [0]:
spark.createDataFrame(ages_list, 'int')

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-5404265235422429>, line 1[0m
[0;32m----> 1[0m spark[38;5;241m.[39mcreateDataFrame(ages_list, [38;5;124m'[39m[38;5;124mint[39m[38;5;124m'[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:47[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     45[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     46[0m [38;5;28;01mtry[39;00m:
[0;32m---> 47[0m     res [38;5;241m=[39m func([38;5;241m*[39margs, [38;5;241m*[39m[38;5;241m*[39mkwargs)
[1;32m     48[0m     logger[38;5;241m.[39mlog_success(
[1;32m     49[0m         module_name, class_name, function_name, time[38;5;241m.[39mperf_counter() [38;5;241m-[39m start, signature
[1;32m     50[0m     )
[1;32m     51[0m     [38;5;28;01mreturn

In [0]:
spark.createDataFrame(ages_list, 'age int').show()

+---+
|age|
+---+
| 21|
| 23|
| 41|
| 32|
+---+



In [0]:
users_list = [(1, 'Scott'), (2, 'Donald'), (3, 'Mickey'), (4, 'Elvis')]

In [0]:
spark.createDataFrame(users_list, 'id int, name string').show()

+---+------+
| id|  name|
+---+------+
|  1| Scott|
|  2|Donald|
|  3|Mickey|
|  4| Elvis|
+---+------+



In [0]:
spark.createDataFrame(users_list, 'user_id int, user_first_name string').show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
df=spark.createDataFrame(users_list, 'user_id int, user_first_name string')

In [0]:
df.show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
df.collect()

[Row(user_id=1, user_first_name='Scott'),
 Row(user_id=2, user_first_name='Donald'),
 Row(user_id=3, user_first_name='Mickey'),
 Row(user_id=4, user_first_name='Elvis')]

In [0]:
type(df.collect())

list

In [0]:
from pyspark.sql import Row

In [0]:
help(Row)

In [0]:
r = Row("Alice", 11) #def a(*d):  a(1,3,4,5)

In [0]:
r

<Row('Alice', 11)>

In [0]:
row2 = Row(name="Alice", age=11) # def a(**d)

In [0]:
row2.name

'Alice'

In [0]:
row2['name']

'Alice'

In [0]:
r[0]

'Alice'

In [0]:
r==row2

True

#Different ways to create dataframe

In [0]:
users_list = [[1, 'Scott'], [2, 'Donald'], [3, 'Mickey'], [4, 'Elvis']]

In [0]:
spark.createDataFrame(users_list, ['user_id' , 'user_first_name ']).show()

+-------+----------------+
|user_id|user_first_name |
+-------+----------------+
|      1|           Scott|
|      2|          Donald|
|      3|          Mickey|
|      4|           Elvis|
+-------+----------------+



In [0]:
from pyspark.sql import Row

In [0]:
users_rows = [Row(*user) for user in users_list]

In [0]:
users_rows

[<Row(1, 'Scott')>, <Row(2, 'Donald')>, <Row(3, 'Mickey')>, <Row(4, 'Elvis')>]

In [0]:
spark.createDataFrame(users_rows, 'user_id int, user_first_name string').show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
def dummy(*args):
    print(args)
    print(len(args))

In [0]:
dummy(1)

(1,)
1


In [0]:
dummy(1, 'Hello')

(1, 'Hello')
2


In [0]:
user_details = [1, 'Scott']

In [0]:
dummy(user_details)

([1, 'Scott'],)
1


In [0]:
dummy(*user_details)

(1, 'Scott')
2


In [0]:
#simallar we can perform list of tuples

In [0]:
users_list = [(1, 'Scott'), (2, 'Donald'), (3, 'Mickey'), (4, 'Elvis')]

In [0]:
spark.createDataFrame(users_list, 'user_id int, user_first_name string').show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
users_rows = [Row(*user) for user in users_list]

In [0]:
spark.createDataFrame(users_rows, 'user_id int, user_first_name string').show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
users_list = [
    {'user_id': 1, 'user_first_name': 'Scott'},
    {'user_id': 2, 'user_first_name': 'Donald'},
    {'user_id': 3, 'user_first_name': 'Mickey'},
    {'user_id': 4, 'user_first_name': 'Elvis'}
]

In [0]:
spark.createDataFrame(users_list)

DataFrame[user_first_name: string, user_id: bigint]

In [0]:
from pyspark.sql import Row

In [0]:
user_details = users_list[1]

In [0]:
user_details

{'user_id': 2, 'user_first_name': 'Donald'}

In [0]:
Row(*user_details.values())

<Row(2, 'Donald')>

In [0]:
users_rows = [Row(*user.values()) for user in users_list]

In [0]:
users_rows

[<Row(1, 'Scott')>, <Row(2, 'Donald')>, <Row(3, 'Mickey')>, <Row(4, 'Elvis')>]

In [0]:
spark.createDataFrame(users_rows, 'user_id bigint, user_first_name string').show()

+-------+---------------+
|user_id|user_first_name|
+-------+---------------+
|      1|          Scott|
|      2|         Donald|
|      3|         Mickey|
|      4|          Elvis|
+-------+---------------+



In [0]:
users_rows = [Row(**user) for user in users_list]

In [0]:
users_rows

[Row(user_id=1, user_first_name='Scott'),
 Row(user_id=2, user_first_name='Donald'),
 Row(user_id=3, user_first_name='Mickey'),
 Row(user_id=4, user_first_name='Elvis')]

In [0]:
spark.createDataFrame(users_rows)

DataFrame[user_id: bigint, user_first_name: string]

In [0]:
def dummy(**kwargs):
    print(kwargs)
    print(len(kwargs))

In [0]:
user_details = {'user_id': 1, 'user_first_name': 'Scott'}

In [0]:
dummy(user_details=user_details)

{'user_details': {'user_id': 1, 'user_first_name': 'Scott'}}
1


In [0]:
dummy(**user_details)

{'user_id': 1, 'user_first_name': 'Scott'}
2


In [0]:
dummy(user_id=1, user_first_name='Scott')

{'user_id': 1, 'user_first_name': 'Scott'}
2




#Selecting columns Spark DataFrame


In [0]:
from pyspark.sql import Row
import pandas as pd

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "phone_numbers": Row(mobile="+1 234 567 8901", home="+1 234 567 8911"),
        "courses": [1, 2],
        "is_customer": True,
        "amount_paid": 1000.55,
        "customer_from": datetime.date(2021, 1, 15),
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "phone_numbers":  Row(mobile="+1 234 567 8923", home="1 234 567 8934"),
        "courses": [3],
        "is_customer": True,
        "amount_paid": 900.0,
        "customer_from": datetime.date(2021, 2, 14),
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "phone_numbers": Row(mobile="+1 714 512 9752", home="+1 714 512 6601"),
        "courses": [2, 4],
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.date(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "phone_numbers": Row(mobile=None, home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "phone_numbers": Row(mobile="+1 817 934 7142", home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 28)
    }
]

In [0]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
users_df.show()

+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|        [NULL, NULL]|     []|      false|       NULL|         NULL|2021-04-10 17:45:30|
|  5|      Kurt|        Rome|krome

**Overview of Narrow and Wide Transformations**

Let us get an overview of Narrow and Wide Transformations.
* Here are the functions related to narrow transformations. Narrow transformations doesn't result in shuffling. These are also known as row level transformations.
  * `df.select`
  * `df.filter`
  * `df.withColumn`
  * `df.withColumnRenamed`
  * `df.drop`
* Here are the functions related to wide transformations.
  * `df.distinct`
  * `df.union` or any set operation
  * `df.join` or any join operation
  * `df.groupBy`
  * `df.sort` or `df.orderBy`
* Any function that result in shuffling is wide transformation. For all the wide transformations, we have to deal with group of records based on a key.

**To run a notebook in Databricks**

In [0]:
help(users_df.select)

Help on method select in module pyspark.sql.dataframe:

select(*cols: 'ColumnOrName') -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Projects a set of expressions and returns a new :class:`DataFrame`.
    
    .. versionadded:: 1.3.0
    
    Parameters
    ----------
    cols : str, :class:`Column`, or list
        column names (string) or expressions (:class:`Column`).
        If one of the column names is '*', that column is expanded to include all columns
        in the current :class:`DataFrame`.
    
    Examples
    --------
    >>> df.select('*').collect()
    [Row(age=2, name='Alice'), Row(age=5, name='Bob')]
    >>> df.select('name', 'age').collect()
    [Row(name='Alice', age=2), Row(name='Bob', age=5)]
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name='Alice', age=12), Row(name='Bob', age=15)]



In [0]:
users_df.select('*').show()

+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|        [NULL, NULL]|     []|      false|       NULL|         NULL|2021-04-10 17:45:30|
|  5|      Kurt|        Rome|krome

In [0]:
users_df.select('id', 'first_name', 'last_name').show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
users_df.select(['id', 'first_name', 'last_name']).show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
users_df.alias('u').select('u.*').show()

+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|        [NULL, NULL]|     []|      false|       NULL|         NULL|2021-04-10 17:45:30|
|  5|      Kurt|        Rome|krome

In [0]:
users_df.alias('u').select('u.id','u.first_name','u.last_name').show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
from pyspark.sql.functions import col

In [0]:
col('id').cast('String')

Column<'CAST(id AS STRING)'>

In [0]:
type('id')

str

In [0]:
users_df.is_customer.id

Column<'is_customer[id]'>

In [0]:
users_df.select(col('id'),users_df['first_name'],'last_name',users_df.is_customer,users_df.id+10,col('id')+10).show()

+---+----------+------------+-----------+---------+---------+
| id|first_name|   last_name|is_customer|(id + 10)|(id + 10)|
+---+----------+------------+-----------+---------+---------+
|  1|    Corrie|Van den Oord|       true|       11|       11|
|  2|  Nikolaus|     Brewitt|       true|       12|       12|
|  3|    Orelie|      Penney|       true|       13|       13|
|  4|     Ashby|    Maddocks|      false|       14|       14|
|  5|      Kurt|        Rome|      false|       15|       15|
+---+----------+------------+-----------+---------+---------+



In [0]:
users_df.select(
    'id',
    'first_name',
    'last_name',
).show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
users_df.select(
    'id'+10,
    'first_name',
    'last_name',
).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-5404265235422503>, line 2[0m
[1;32m      1[0m users_df[38;5;241m.[39mselect(
[0;32m----> 2[0m     [38;5;124m'[39m[38;5;124mid[39m[38;5;124m'[39m[38;5;241m+[39m[38;5;241m10[39m,
[1;32m      3[0m     [38;5;124m'[39m[38;5;124mfirst_name[39m[38;5;124m'[39m,
[1;32m      4[0m     [38;5;124m'[39m[38;5;124mlast_name[39m[38;5;124m'[39m,
[1;32m      5[0m )[38;5;241m.[39mshow()

[0;31mTypeError[0m: can only concatenate str (not "int") to str

In [0]:
users_df.select(
    (col('id')+10),
    'first_name',
    'last_name',
).show()

+---------+----------+------------+
|(id + 10)|first_name|   last_name|
+---------+----------+------------+
|       11|    Corrie|Van den Oord|
|       12|  Nikolaus|     Brewitt|
|       13|    Orelie|      Penney|
|       14|     Ashby|    Maddocks|
|       15|      Kurt|        Rome|
+---------+----------+------------+



In [0]:
users_df.select(
    (col('id')+10).alias('id'),
    'first_name',
    'last_name',
).show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
| 11|    Corrie|Van den Oord|
| 12|  Nikolaus|     Brewitt|
| 13|    Orelie|      Penney|
| 14|     Ashby|    Maddocks|
| 15|      Kurt|        Rome|
+---+----------+------------+



In [0]:
from pyspark.sql.functions import col, concat, lit

In [0]:
users_df.select(
    'id',
    'first_name',
    'last_name',
    concat(col('first_name'), lit(', '), col('last_name')).alias('full_name')
).show()

+---+----------+------------+--------------------+
| id|first_name|   last_name|           full_name|
+---+----------+------------+--------------------+
|  1|    Corrie|Van den Oord|Corrie, Van den Oord|
|  2|  Nikolaus|     Brewitt|   Nikolaus, Brewitt|
|  3|    Orelie|      Penney|      Orelie, Penney|
|  4|     Ashby|    Maddocks|     Ashby, Maddocks|
|  5|      Kurt|        Rome|          Kurt, Rome|
+---+----------+------------+--------------------+



#selectExpr


In [0]:
from pyspark.sql import Row
import pandas as pd

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "phone_numbers": Row(mobile="+1 234 567 8901", home="+1 234 567 8911"),
        "courses": [1, 2],
        "is_customer": True,
        "amount_paid": 1000.55,
        "customer_from": datetime.date(2021, 1, 15),
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "phone_numbers":  Row(mobile="+1 234 567 8923", home="1 234 567 8934"),
        "courses": [3],
        "is_customer": True,
        "amount_paid": 900.0,
        "customer_from": datetime.date(2021, 2, 14),
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "phone_numbers": Row(mobile="+1 714 512 9752", home="+1 714 512 6601"),
        "courses": [2, 4],
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.date(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "phone_numbers": Row(mobile=None, home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "phone_numbers": Row(mobile="+1 817 934 7142", home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 28)
    }
]

In [0]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)

In [0]:
users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
help(users_df.selectExpr)

Help on method selectExpr in module pyspark.sql.dataframe:

selectExpr(*expr: Union[str, List[str]]) -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Projects a set of SQL expressions and returns a new :class:`DataFrame`.
    
    This is a variant of :func:`select` that accepts SQL expressions.
    
    .. versionadded:: 1.3.0
    
    Examples
    --------
    >>> df.selectExpr("age * 2", "abs(age)").collect()
    [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]



In [0]:
from pyspark.sql.functions import col,lit,concat

In [0]:
users_df.selectExpr('*').show(truncate=False)

+---+----------+------------+-------------------------+----------------------------------+-------+-----------+-----------+-------------+-------------------+
|id |first_name|last_name   |email                    |phone_numbers                     |courses|is_customer|amount_paid|customer_from|last_updated_ts    |
+---+----------+------------+-------------------------+----------------------------------+-------+-----------+-----------+-------------+-------------------+
|1  |Corrie    |Van den Oord|cvandenoord0@etsy.com    |[+1 234 567 8901, +1 234 567 8911]|[1, 2] |true       |1000.55    |2021-01-15   |2021-02-10 01:15:00|
|2  |Nikolaus  |Brewitt     |nbrewitt1@dailymail.co.uk|[+1 234 567 8923, 1 234 567 8934] |[3]    |true       |900.0      |2021-02-14   |2021-02-18 03:33:00|
|3  |Orelie    |Penney      |openney2@vistaprint.com  |[+1 714 512 9752, +1 714 512 6601]|[2, 4] |true       |850.55     |2021-01-21   |2021-03-15 15:16:55|
|4  |Ashby     |Maddocks    |amaddocks3@home.pl       |[NU

In [0]:
# Defining alias to the dataframe
users_df.alias('u').selectExpr('u.*').show()

+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|        [NULL, NULL]|     []|      false|       NULL|         NULL|2021-04-10 17:45:30|
|  5|      Kurt|        Rome|krome

In [0]:
users_df.selectExpr('id','last_name','first_name').show()

+---+------------+----------+
| id|   last_name|first_name|
+---+------------+----------+
|  1|Van den Oord|    Corrie|
|  2|     Brewitt|  Nikolaus|
|  3|      Penney|    Orelie|
|  4|    Maddocks|     Ashby|
|  5|        Rome|      Kurt|
+---+------------+----------+



In [0]:
users_df.selectExpr(['id', 'first_name', 'last_name']).show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
type(col('id'))

pyspark.sql.column.Column

In [0]:
type(lit(5)+col('id'))

pyspark.sql.column.Column

In [0]:
users_df. \
    select(
        'id', 'first_name', 'last_name',
        concat(col('first_name'), lit(', '), col('last_name')).alias('full_name')
    ). \
    show()

+---+----------+------------+--------------------+
| id|first_name|   last_name|           full_name|
+---+----------+------------+--------------------+
|  1|    Corrie|Van den Oord|Corrie, Van den Oord|
|  2|  Nikolaus|     Brewitt|   Nikolaus, Brewitt|
|  3|    Orelie|      Penney|      Orelie, Penney|
|  4|     Ashby|    Maddocks|     Ashby, Maddocks|
|  5|      Kurt|        Rome|          Kurt, Rome|
+---+----------+------------+--------------------+



In [0]:
# Using selectExpr to use Spark SQL Functions
users_df.selectExpr('id', 'first_name', 'last_name', '(first_name, ", ", last_name) AS full_name').show()

+---+----------+------------+--------------------+
| id|first_name|   last_name|           full_name|
+---+----------+------------+--------------------+
|  1|    Corrie|Van den Oord|{Corrie, , , Van ...|
|  2|  Nikolaus|     Brewitt|{Nikolaus, , , Br...|
|  3|    Orelie|      Penney|{Orelie, , , Penney}|
|  4|     Ashby|    Maddocks|{Ashby, , , Maddo...|
|  5|      Kurt|        Rome|    {Kurt, , , Rome}|
+---+----------+------------+--------------------+



In [0]:
users_df.createOrReplaceTempView('users')

In [0]:
spark.sql("""
    SELECT  first_name, last_name,
        concat(first_name, ', ', last_name) AS full_name
    FROM users
""").show()

+----------+------------+--------------------+
|first_name|   last_name|           full_name|
+----------+------------+--------------------+
|    Corrie|Van den Oord|Corrie, Van den Oord|
|  Nikolaus|     Brewitt|   Nikolaus, Brewitt|
|    Orelie|      Penney|      Orelie, Penney|
|     Ashby|    Maddocks|     Ashby, Maddocks|
|      Kurt|        Rome|          Kurt, Rome|
+----------+------------+--------------------+



In [0]:
users_df['id']

Column<'id'>

In [0]:
from  pyspark.sql.functions import col

In [0]:
col('id')

Column<'id'>

In [0]:
type(users_df['id']),type(col('id'))

(pyspark.sql.column.Column, pyspark.sql.column.Column)

In [0]:
users_df.select('id', col('id')+lit(5), 'last_name').show()

+---+--------+------------+
| id|(id + 5)|   last_name|
+---+--------+------------+
|  1|       6|Van den Oord|
|  2|       7|     Brewitt|
|  3|       8|      Penney|
|  4|       9|    Maddocks|
|  5|      10|        Rome|
+---+--------+------------+



In [0]:
# This does not work as selectExpr can only take column names or SQL style expressions on column names
users_df.selectExpr('id', col('first_name'), 'last_name').show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<ipython-input-130-81740d509864>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# This does not work as selectExpr can only take column names or SQL style expressions on column names[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0musers_df[0m[0;34m.[0m[0mselectExpr[0m[0;34m([0m[0;34m'id'[0m[0;34m,[0m [0mcol[0m[0;34m([0m[0;34m'first_name'[0m[0;34m)[0m[0;34m,[0m [0;34m'last_name'[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py[0m in [0;36mselectExpr[0;34m(self, *expr)[0m
[1;32m   2046[0m         [0;32mif[0m [0mlen[0m[0;34m([0m[0mexpr[0m[0;34m)[0m [0;34m==[0m [0;36m1[0m [0;32mand[0m [0misinstance[0m[0;34m([0m[0mexpr

In [0]:
cols = ['id', 'first_name', 'last_name']
users_df.select(*cols).show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
|  4|     Ashby|    Maddocks|
|  5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
help(col)

Help on function col in module pyspark.sql.functions:

col(col: str) -> pyspark.sql.column.Column
    Returns a :class:`~pyspark.sql.Column` based on the given column name.'
    Examples
    --------
    >>> col('x')
    Column<'x'>
    >>> column('x')
    Column<'x'>
    
    .. versionadded:: 1.3



In [0]:
user_id=col('id')

In [0]:
user_id

Column<'id'>

In [0]:
users_df.select(user_id).show()

+---+
| id|
+---+
|  1|
|  2|
|  3|
|  4|
|  5|
+---+



In [0]:
users_df.select('id', 'customer_from').dtypes

[('id', 'bigint'), ('customer_from', 'date')]

In [0]:
users_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- courses: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: date (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



In [0]:
from pyspark.sql.functions import date_format

In [0]:
users_df.select(
    col('id'),
    'customer_from',
    date_format('customer_from', 'yyyyMMdd')
).show()

+---+-------------+------------------------------------+
| id|customer_from|date_format(customer_from, yyyyMMdd)|
+---+-------------+------------------------------------+
|  1|   2021-01-15|                            20210115|
|  2|   2021-02-14|                            20210214|
|  3|   2021-01-21|                            20210121|
|  4|         NULL|                                NULL|
|  5|         NULL|                                NULL|
+---+-------------+------------------------------------+



In [0]:
users_df.select(
    col('id'),
    'customer_from',
    date_format('customer_from', 'yyyyMMdd')
).printSchema()

root
 |-- id: long (nullable = true)
 |-- customer_from: date (nullable = true)
 |-- date_format(customer_from, yyyyMMdd): string (nullable = true)



In [0]:
users_df.select(
    col('id'),
    date_format('customer_from', 'yyyyMMdd').cast('int').alias('customer_from')
).show()

+---+-------------+
| id|customer_from|
+---+-------------+
|  1|     20210115|
|  2|     20210214|
|  3|     20210121|
|  4|         null|
|  5|         null|
+---+-------------+



In [0]:
cols = [col('id'), date_format('customer_from', 'yyyyMMdd').cast('int').alias('customer_from')]
users_df.select(*cols).show()

+---+-------------+
| id|customer_from|
+---+-------------+
|  1|     20210115|
|  2|     20210214|
|  3|     20210121|
|  4|         null|
|  5|         null|
+---+-------------+



In [0]:
from pyspark.sql.functions import lit

In [0]:
users_df.select(lit(5).alias('height')).withColumn('spark_user', lit(True)).show()

+------+----------+
|height|spark_user|
+------+----------+
|     5|      true|
|     5|      true|
|     5|      true|
|     5|      true|
|     5|      true|
+------+----------+



In [0]:
users_df.createOrReplaceTempView('users')

In [0]:
spark.sql("""
        select id,amount_paid,(amount_paid+25) as paid  from users
""").show()

+---+-----------+-------+
| id|amount_paid|   paid|
+---+-----------+-------+
|  1|    1000.55|1025.55|
|  2|      900.0|  925.0|
|  3|     850.55| 875.55|
|  4|       null|   null|
|  5|       null|   null|
+---+-----------+-------+



In [0]:
users_df.selectExpr('id','amount_paid','(amount_paid+25) as paid').\
               show()

+---+-----------+-------+
| id|amount_paid|   paid|
+---+-----------+-------+
|  1|    1000.55|1025.55|
|  2|      900.0|  925.0|
|  3|     850.55| 875.55|
|  4|       null|   null|
|  5|       null|   null|
+---+-----------+-------+



In [0]:
# This will fail
users_df.select('id', 'amount_paid' + 25).show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<ipython-input-59-dab279906060>[0m in [0;36m<module>[0;34m[0m
[1;32m      1[0m [0;31m# This will fail[0m[0;34m[0m[0;34m[0m[0;34m[0m[0m
[0;32m----> 2[0;31m [0musers_df[0m[0;34m.[0m[0mselect[0m[0;34m([0m[0;34m'id'[0m[0;34m,[0m [0;34m'amount_paid'[0m [0;34m+[0m [0;36m25[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;31mTypeError[0m: can only concatenate str (not "int") to str

In [0]:
from pyspark.sql.functions import lit,col

In [0]:
lit(25)

Column<'25'>

In [0]:
'amount_paid' + lit(25.0)

Column<'(25.0 + amount_paid)'>

In [0]:
'amount_paid' + lit(25.0)

Column<'(25.0 + amount_paid)'>

In [0]:
# Returns null
# amount_paid is converted to string by implicitly using lit.
# Spark returns null when we perform arithmetic operations on noncompatible types
users_df.select('id', 'amount_paid' + lit(25.0), lit(50) + lit(25)).show()

+---+--------------------+---------+
| id|(25.0 + amount_paid)|(50 + 25)|
+---+--------------------+---------+
|  1|                null|       75|
|  2|                null|       75|
|  3|                null|       75|
|  4|                null|       75|
|  5|                null|       75|
+---+--------------------+---------+



In [0]:
users_df.select('id', col('amount_paid') + lit(25.0), users_df.amount_paid+10,users_df['id']+10).show()

+---+--------------------+------------------+---------+
| id|(amount_paid + 25.0)|(amount_paid + 10)|(id + 10)|
+---+--------------------+------------------+---------+
|  1|             1025.55|           1010.55|       11|
|  2|               925.0|             910.0|       12|
|  3|              875.55|            860.55|       13|
|  4|                null|              null|       14|
|  5|                null|              null|       15|
+---+--------------------+------------------+---------+



# Specifying Schema for Spark Dataframe


In [0]:
import datetime
users = [(1,
  'Corrie',
  'Van den Oord',
  'cvandenoord0@etsy.com',
  True,
  1000.55,
  datetime.date(2021, 1, 15),
  datetime.datetime(2021, 2, 10, 1, 15)),
 (2,
  'Nikolaus',
  'Brewitt',
  'nbrewitt1@dailymail.co.uk',
  True,
  900.0,
  datetime.date(2021, 2, 14),
  datetime.datetime(2021, 2, 18, 3, 33)),
 (3,
  'Orelie',
  'Penney',
  'openney2@vistaprint.com',
  True,
  850.55,
  datetime.date(2021, 1, 21),
  datetime.datetime(2021, 3, 15, 15, 16, 55)),
 (4,
  'Ashby',
  'Maddocks',
  'amaddocks3@home.pl',
  False,
  None,
  None,
  datetime.datetime(2021, 4, 10, 17, 45, 30)),
 (5,
  'Kurt',
  'Rome',
  'krome4@shutterfly.com',
  False,
  None,
  None,
  datetime.datetime(2021, 4, 2, 0, 55, 18))]

In [0]:
users_schema1 = '''
    id INT,
    first_name STRING,
    last_name STRING,
    email STRING,
    is_customer BOOLEAN,
    amount_paid FLOAT,
    customer_from DATE,
    last_updated_ts TIMESTAMP
'''

In [0]:
users_schema2 = [
    'id',
    'first_name',
    'last_name'
    'email',
    'is_customer',
    'amount_paid',
    'customer_from',
    'last_updated_ts'
]

In [0]:
from pyspark.sql.types import *

In [0]:
users_schema3 = StructType([
                            StructField('id',IntegerType()),
                            StructField('first_name',StringType()),
                            StructField('last_name',StringType()),
                            StructField('email',StringType()),
                            StructField('is_customer',BooleanType()),
                            StructField('amount_paid',FloatType()),
                            StructField('customer_from',DateType()),
                            StructField('last_updated_ts',TimestampType()),

])

In [0]:
spark.createDataFrame(users).show(truncate=False)

+---+--------+------------+-------------------------+-----+-------+----------+-------------------+
|_1 |_2      |_3          |_4                       |_5   |_6     |_7        |_8                 |
+---+--------+------------+-------------------------+-----+-------+----------+-------------------+
|1  |Corrie  |Van den Oord|cvandenoord0@etsy.com    |true |1000.55|2021-01-15|2021-02-10 01:15:00|
|2  |Nikolaus|Brewitt     |nbrewitt1@dailymail.co.uk|true |900.0  |2021-02-14|2021-02-18 03:33:00|
|3  |Orelie  |Penney      |openney2@vistaprint.com  |true |850.55 |2021-01-21|2021-03-15 15:16:55|
|4  |Ashby   |Maddocks    |amaddocks3@home.pl       |false|null   |null      |2021-04-10 17:45:30|
|5  |Kurt    |Rome        |krome4@shutterfly.com    |false|null   |null      |2021-04-02 00:55:18|
+---+--------+------------+-------------------------+-----+-------+----------+-------------------+



In [0]:
spark.createDataFrame(users,schema=users_schema1).show(truncate=False)

+---+----------+------------+-------------------------+-----------+-----------+-------------+-------------------+
|id |first_name|last_name   |email                    |is_customer|amount_paid|customer_from|last_updated_ts    |
+---+----------+------------+-------------------------+-----------+-----------+-------------+-------------------+
|1  |Corrie    |Van den Oord|cvandenoord0@etsy.com    |true       |1000.55    |2021-01-15   |2021-02-10 01:15:00|
|2  |Nikolaus  |Brewitt     |nbrewitt1@dailymail.co.uk|true       |900.0      |2021-02-14   |2021-02-18 03:33:00|
|3  |Orelie    |Penney      |openney2@vistaprint.com  |true       |850.55     |2021-01-21   |2021-03-15 15:16:55|
|4  |Ashby     |Maddocks    |amaddocks3@home.pl       |false      |null       |null         |2021-04-10 17:45:30|
|5  |Kurt      |Rome        |krome4@shutterfly.com    |false      |null       |null         |2021-04-02 00:55:18|
+---+----------+------------+-------------------------+-----------+-----------+---------

In [0]:
spark.createDataFrame(users,schema=users_schema2).show(truncate=False)

+---+----------+--------------+-------------------------+-----------+-------------+---------------+-------------------+
|id |first_name|last_nameemail|is_customer              |amount_paid|customer_from|last_updated_ts|_8                 |
+---+----------+--------------+-------------------------+-----------+-------------+---------------+-------------------+
|1  |Corrie    |Van den Oord  |cvandenoord0@etsy.com    |true       |1000.55      |2021-01-15     |2021-02-10 01:15:00|
|2  |Nikolaus  |Brewitt       |nbrewitt1@dailymail.co.uk|true       |900.0        |2021-02-14     |2021-02-18 03:33:00|
|3  |Orelie    |Penney        |openney2@vistaprint.com  |true       |850.55       |2021-01-21     |2021-03-15 15:16:55|
|4  |Ashby     |Maddocks      |amaddocks3@home.pl       |false      |null         |null           |2021-04-10 17:45:30|
|5  |Kurt      |Rome          |krome4@shutterfly.com    |false      |null         |null           |2021-04-02 00:55:18|
+---+----------+--------------+---------

In [0]:
spark.createDataFrame(users,schema=users_schema3).show(truncate=False)

+---+----------+------------+-------------------------+-----------+-----------+-------------+-------------------+
|id |first_name|last_name   |email                    |is_customer|amount_paid|customer_from|last_updated_ts    |
+---+----------+------------+-------------------------+-----------+-----------+-------------+-------------------+
|1  |Corrie    |Van den Oord|cvandenoord0@etsy.com    |true       |1000.55    |2021-01-15   |2021-02-10 01:15:00|
|2  |Nikolaus  |Brewitt     |nbrewitt1@dailymail.co.uk|true       |900.0      |2021-02-14   |2021-02-18 03:33:00|
|3  |Orelie    |Penney      |openney2@vistaprint.com  |true       |850.55     |2021-01-21   |2021-03-15 15:16:55|
|4  |Ashby     |Maddocks    |amaddocks3@home.pl       |false      |null       |null         |2021-04-10 17:45:30|
|5  |Kurt      |Rome        |krome4@shutterfly.com    |false      |null       |null         |2021-04-02 00:55:18|
+---+----------+------------+-------------------------+-----------+-----------+---------

# Dataframe using Pandas Dataframe

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "is_customer": True,
        "amount_paid": 1000.55,
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "is_customer": True,
        "amount_paid": 900.0,
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.datetime(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "is_customer": False,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "is_customer": False,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 18)
    }
]

In [0]:
from pyspark.sql import Row

In [0]:
df1_normal=spark.createDataFrame(users).show(truncate=False)

+-----------+-------------------------+----------+---+-----------+------------+-------------------+-------------------+
|amount_paid|email                    |first_name|id |is_customer|last_name   |last_updated_ts    |customer_from      |
+-----------+-------------------------+----------+---+-----------+------------+-------------------+-------------------+
|1000.55    |cvandenoord0@etsy.com    |Corrie    |1  |true       |Van den Oord|2021-02-10 01:15:00|null               |
|900.0      |nbrewitt1@dailymail.co.uk|Nikolaus  |2  |true       |Brewitt     |2021-02-18 03:33:00|null               |
|850.55     |openney2@vistaprint.com  |Orelie    |3  |true       |Penney      |2021-03-15 15:16:55|2021-01-21 00:00:00|
|null       |amaddocks3@home.pl       |Ashby     |4  |false      |Maddocks    |2021-04-10 17:45:30|null               |
|null       |krome4@shutterfly.com    |Kurt      |5  |false      |Rome        |2021-04-02 00:55:18|null               |
+-----------+-------------------------+-

In [0]:
spark.createDataFrame([Row(**user) for user in users]).show(truncate=False)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JJavaError[0m                             Traceback (most recent call last)
[0;32m<ipython-input-80-7267131344fd>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0;34m[[0m[0mRow[0m[0;34m([0m[0;34m**[0m[0muser[0m[0;34m)[0m [0;32mfor[0m [0muser[0m [0;32min[0m [0musers[0m[0;34m][0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0mtruncate[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/usr/local/lib/python3.7/dist-packages/pyspark/sql/dataframe.py[0m in [0;36mshow[0;34m(self, n, truncate, vertical)[0m
[1;32m    613[0m                 )
[1;32m    614[0m [0;34m[0m[0m
[0;32m--> 615[0;31m             [0mprint[0m[0;34m([0m[0mself[0m[0;34m.[0m[0m_jdf[0m[0;34m.[0m[0mshowString[0m[0;34m([0m[0mn[0m[0;34m,[0m [0mint_truncate[0m[0;34m,[0m 

In [0]:
import pandas as pd

In [0]:
pd.DataFrame(users)

Unnamed: 0,id,first_name,last_name,email,is_customer,amount_paid,last_updated_ts,customer_from
0,1,Corrie,Van den Oord,cvandenoord0@etsy.com,True,1000.55,2021-02-10 01:15:00,NaT
1,2,Nikolaus,Brewitt,nbrewitt1@dailymail.co.uk,True,900.0,2021-02-18 03:33:00,NaT
2,3,Orelie,Penney,openney2@vistaprint.com,True,850.55,2021-03-15 15:16:55,2021-01-21
3,4,Ashby,Maddocks,amaddocks3@home.pl,False,,2021-04-10 17:45:30,NaT
4,5,Kurt,Rome,krome4@shutterfly.com,False,,2021-04-02 00:55:18,NaT


In [0]:
df2_pd=spark.createDataFrame(pd.DataFrame(users)).show()

+---+----------+------------+--------------------+-----------+-----------+-------------------+-------------------+
| id|first_name|   last_name|               email|is_customer|amount_paid|    last_updated_ts|      customer_from|
+---+----------+------------+--------------------+-----------+-----------+-------------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|       true|    1000.55|2021-02-10 01:15:00|               null|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|       true|      900.0|2021-02-18 03:33:00|               null|
|  3|    Orelie|      Penney|openney2@vistapri...|       true|     850.55|2021-03-15 15:16:55|2021-01-21 00:00:00|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|      false|       null|2021-04-10 17:45:30|               null|
|  5|      Kurt|        Rome|krome4@shutterfly...|      false|       null|2021-04-02 00:55:18|               null|
+---+----------+------------+--------------------+-----------+-----------+------

#Filtering


In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import col

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "gender": "male",
        "current_city": "Dallas",
        "phone_numbers": Row(mobile="+1 234 567 8901", home="+1 234 567 8911"),
        "courses": [1, 2],
        "is_customer": True,
        "amount_paid": 1000.55,
        "customer_from": datetime.date(2021, 1, 15),
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "gender": "male",
        "current_city": "Houston",
        "phone_numbers":  Row(mobile="+1 234 567 8923", home="1 234 567 8934"),
        "courses": [3],
        "is_customer": True,
        "amount_paid": 900.0,
        "customer_from": datetime.date(2021, 2, 14),
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "gender": "female",
        "current_city": "",
        "phone_numbers": Row(mobile="+1 714 512 9752", home="+1 714 512 6601"),
        "courses": [2, 4],
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.date(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "gender": "male",
        "current_city": "San Fransisco",
        "phone_numbers": Row(mobile=None, home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "gender": "female",
        "current_city": None,
        "phone_numbers": Row(mobile="+1 817 934 7142", home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 18)
    }
]

In [0]:
import pandas as pd

In [0]:
users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
users_df.filter(col('is_customer') == True).show()

+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|  male|      Dallas|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|  male|     Houston|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|female|            |[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
+---+----------+------------+--------------------+--

In [0]:
users_df.filter(col('is_customer') == 'true').show()

+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|  male|      Dallas|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|  male|     Houston|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|female|            |[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
+---+----------+------------+--------------------+--

In [0]:
users_df.filter('is_customer = "true"').show()

+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|  male|      Dallas|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|  male|     Houston|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|female|            |[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
+---+----------+------------+--------------------+--

In [0]:
users_df.createOrReplaceTempView('users')

In [0]:
spark.sql('''
    SELECT * FROM users
    WHERE is_customer = "true"
'''). \
    show()

+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|  male|      Dallas|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|  male|     Houston|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|female|            |[+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
+---+----------+------------+--------------------+--

In [0]:
users_df.filter("current_city == 'Dallas'").show()

+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|  male|      Dallas|[+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
+---+----------+------------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+



In [0]:
users_df.filter(col('amount_paid') == 900.0).show()

+---+----------+---------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|last_name|               email|gender|current_city|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+---------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  2|  Nikolaus|  Brewitt|nbrewitt1@dailyma...|  male|     Houston|[+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
+---+----------+---------+--------------------+------+------------+--------------------+-------+-----------+-----------+-------------+-------------------+



In [0]:
users_df. \
    select('id', 'email', 'last_updated_ts'). \
    filter(col('last_updated_ts').between('2021-02-15 00:00:00', '2021-03-15 23:59:59')). \
    show()

+---+--------------------+-------------------+
| id|               email|    last_updated_ts|
+---+--------------------+-------------------+
|  2|nbrewitt1@dailyma...|2021-02-18 03:33:00|
|  3|openney2@vistapri...|2021-03-15 15:16:55|
+---+--------------------+-------------------+



In [0]:
users_df. \
    select('id', 'amount_paid'). \
    filter(col('amount_paid').between(850, 900)). \
    show()

+---+-----------+
| id|amount_paid|
+---+-----------+
|  2|      900.0|
|  3|     850.55|
+---+-----------+



In [0]:
users_df. \
    select('id', 'current_city'). \
    filter((col('current_city') == 'Houston') | (col('current_city') == 'Dallas')). \
    show()

+---+------------+
| id|current_city|
+---+------------+
|  1|      Dallas|
|  2|     Houston|
+---+------------+



In [0]:
users_df. \
    filter((col('gender') == 'male') & (col('is_customer') == True)). \
    select('id', 'gender', 'is_customer'). \
    show()

+---+------+-----------+
| id|gender|is_customer|
+---+------+-----------+
|  1|  male|       true|
|  2|  male|       true|
+---+------+-----------+



#Drop

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "phone_numbers": Row(mobile="+1 234 567 8901", home="+1 234 567 8911"),
        "courses": [1, 2],
        "is_customer": True,
        "amount_paid": 1000.55,
        "customer_from": datetime.date(2021, 1, 15),
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "phone_numbers":  Row(mobile="+1 234 567 8923", home="1 234 567 8934"),
        "courses": [3],
        "is_customer": True,
        "amount_paid": 900.0,
        "customer_from": datetime.date(2021, 2, 14),
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "phone_numbers": Row(mobile="+1 714 512 9752", home="+1 714 512 6601"),
        "courses": [2, 4],
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.date(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "phone_numbers": Row(mobile=None, home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "phone_numbers": Row(mobile="+1 817 934 7142", home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 18)
    }
]

In [0]:
import pandas as pd

In [0]:
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', False)

In [0]:
users_df = spark.createDataFrame(pd.DataFrame(users))

In [0]:
users_df.show()

+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
| id|first_name|   last_name|               email|       phone_numbers|courses|is_customer|amount_paid|customer_from|    last_updated_ts|
+---+----------+------------+--------------------+--------------------+-------+-----------+-----------+-------------+-------------------+
|  1|    Corrie|Van den Oord|cvandenoord0@etsy...|{+1 234 567 8901,...| [1, 2]|       true|    1000.55|   2021-01-15|2021-02-10 01:15:00|
|  2|  Nikolaus|     Brewitt|nbrewitt1@dailyma...|{+1 234 567 8923,...|    [3]|       true|      900.0|   2021-02-14|2021-02-18 03:33:00|
|  3|    Orelie|      Penney|openney2@vistapri...|{+1 714 512 9752,...| [2, 4]|       true|     850.55|   2021-01-21|2021-03-15 15:16:55|
|  4|     Ashby|    Maddocks|  amaddocks3@home.pl|        {null, null}|     []|      false|        NaN|         null|2021-04-10 17:45:30|
|  5|      Kurt|        Rome|krome

In [0]:
help(users_df.drop)

Help on method drop in module pyspark.sql.dataframe:

drop(*cols: 'ColumnOrName') -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Returns a new :class:`DataFrame` that drops the specified column.
    This is a no-op if schema doesn't contain the given column name(s).
    
    .. versionadded:: 1.4.0
    
    Parameters
    ----------
    cols: str or :class:`Column`
        a name of the column, or the :class:`Column` to drop
    
    Examples
    --------
    >>> df.drop('age').collect()
    [Row(name='Alice'), Row(name='Bob')]
    
    >>> df.drop(df.age).collect()
    [Row(name='Alice'), Row(name='Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
    [Row(age=5, height=85, name='Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
    [Row(age=5, name='Bob', height=85)]
    
    >>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
    [Row(name='Bob')]



In [0]:
users_df.drop(users_df['last_updated_ts']).printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: struct (nullable = true)
 |    |-- mobile: string (nullable = true)
 |    |-- home: string (nullable = true)
 |-- courses: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: date (nullable = true)



In [0]:
from pyspark.sql.functions import col

In [0]:
user1=users_df.drop(col('last_updated_ts')).printSchema()

root
 |-- id: long (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: struct (nullable = true)
 |    |-- mobile: string (nullable = true)
 |    |-- home: string (nullable = true)
 |-- courses: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: date (nullable = true)



In [0]:
users_df.drop('first_name', 'last_name').printSchema()

root
 |-- id: long (nullable = true)
 |-- email: string (nullable = true)
 |-- phone_numbers: struct (nullable = true)
 |    |-- mobile: string (nullable = true)
 |    |-- home: string (nullable = true)
 |-- courses: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- is_customer: boolean (nullable = true)
 |-- amount_paid: double (nullable = true)
 |-- customer_from: date (nullable = true)
 |-- last_updated_ts: timestamp (nullable = true)



# read

In [0]:
%fs ls /content/customers.csv

UsageError: Line magic function `%fs` not found.


In [0]:
%fs ls /public/retail_db/orders

In [0]:
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

In [0]:
orders = spark.read.csv('/content/customers.csv')

In [0]:
orders.show()

+-----------+-------------+----------+
|        _c0|          _c1|       _c2|
+-----------+-------------+----------+
|customer_id|customer_name|  location|
|       C001|        Alice|  New York|
|       C002|          Bob|California|
|       C003|      Charlie|     Texas|
|       C004|        Daisy|   Florida|
|       C005|        Ethan|      Ohio|
|       C006|        Fiona|Washington|
|       C007|       George|   Arizona|
|       C008|       Hannah|  Colorado|
+-----------+-------------+----------+



In [0]:
orders.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



In [0]:
orders = spark.read.json('/content/sales_orders.json')

In [0]:
orders.printSchema()

root
 |-- amount: double (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_status: string (nullable = true)



In [0]:
orders.show()

+------+--------+------------+
|amount|order_id|order_status|
+------+--------+------------+
| 250.0|    1001|     Shipped|
| 125.5|    1002|     Pending|
| 300.0|    1003|   Cancelled|
| 450.0|    1004|     Shipped|
| 110.0|    1005|     Pending|
| 99.99|    1006|     Shipped|
| 275.0|    1007|     Shipped|
| 120.0|    1008|     Pending|
|  85.0|    1009|   Cancelled|
| 340.0|    1010|     Shipped|
| 130.0|    1011|     Pending|
|499.99|    1012|     Shipped|
| 89.99|    1013|   Cancelled|
| 199.0|    1014|     Shipped|
| 150.0|    1015|     Pending|
| 310.0|    1016|     Shipped|
| 400.0|    1017|   Cancelled|
| 210.0|    1018|     Pending|
| 360.0|    1019|     Shipped|
|  95.0|    1020|     Pending|
+------+--------+------------+



* Supported file formats.
  * `csv`
  * `text`
  * `json`
  * `parquet`
  * `orc`
* Other common file formats.
  * `xml`
  * `avro`

In [0]:
import getpass
username = getpass.getuser()

input_dir = '/public/retail_db'
output_dir = f'/user/{username}/retail_db_pipe'

In [0]:
dbutils.fs.ls(input_dir)

In [0]:
# Generate CSV files with pipe delimiter
for file_details in dbutils.fs.ls(input_dir):
  print(f'Converting data in {file_details.path} folder from comma separated to pipe separated')
  df = spark.read.csv(file_details.path)
  folder_name = file_details.path.split('/')[-2]
  df.coalesce(1).write.mode('overwrite').csv(f'{output_dir}/{folder_name}', sep='|')

In [0]:
schema = """
    order_id INT,
    order_date TIMESTAMP,
    order_customer_id INT,
    order_status STRING
"""

In [0]:
orders = spark.read.schema(schema).csv(f'/user/{username}/retail_db_pipe/orders')

In [0]:
orders.show()

In [0]:
orders = spark.read.schema(schema).csv(f'/user/{username}/retail_db_pipe/orders', sep='|')

In [0]:
orders.show()

#write

In [0]:
courses = [{'course_id': 1,
  'course_name': '2020 Complete Python Bootcamp: From Zero to Hero in Python',
  'suitable_for': 'Beginner',
  'enrollment': 1100093,
  'stars': 4.6,
  'number_of_ratings': 318066},
 {'course_id': 4,
  'course_name': 'Angular - The Complete Guide (2020 Edition)',
  'suitable_for': 'Intermediate',
  'enrollment': 422557,
  'stars': 4.6,
  'number_of_ratings': 129984},
 {'course_id': 12,
  'course_name': 'Automate the Boring Stuff with Python Programming',
  'suitable_for': 'Advanced',
  'enrollment': 692617,
  'stars': 4.6,
  'number_of_ratings': 70508},
 {'course_id': 10,
  'course_name': 'Complete C# Unity Game Developer 2D',
  'suitable_for': 'Advanced',
  'enrollment': 364934,
  'stars': 4.6,
  'number_of_ratings': 78989},
 {'course_id': 5,
  'course_name': 'Java Programming Masterclass for Software Developers',
  'suitable_for': 'Advanced',
  'enrollment': 502572,
  'stars': 4.6,
  'number_of_ratings': 123798},
 {'course_id': 15,
  'course_name': 'Learn Python Programming Masterclass',
  'suitable_for': 'Advanced',
  'enrollment': 240790,
  'stars': 4.5,
  'number_of_ratings': 58677},
 {'course_id': 3,
  'course_name': 'Machine Learning A-Z™: Hands-On Python & R In Data Science',
  'suitable_for': 'Intermediate',
  'enrollment': 692812,
  'stars': 4.5,
  'number_of_ratings': 132228},
 {'course_id': 14,
  'course_name': 'Modern React with Redux [2020 Update]',
  'suitable_for': 'Intermediate',
  'enrollment': 203214,
  'stars': 4.7,
  'number_of_ratings': 60835},
 {'course_id': 8,
  'course_name': 'Python for Data Science and Machine Learning Bootcamp',
  'suitable_for': 'Intermediate',
  'enrollment': 387789,
  'stars': 4.6,
  'number_of_ratings': 87403},
 {'course_id': 6,
  'course_name': 'React - The Complete Guide (incl Hooks, React Router, Redux)',
  'suitable_for': 'Intermediate',
  'enrollment': 304670,
  'stars': 4.6,
  'number_of_ratings': 90964},
 {'course_id': 18,
  'course_name': 'Selenium WebDriver with Java -Basics to Advanced+Frameworks',
  'suitable_for': 'Advanced',
  'enrollment': 148562,
  'stars': 4.6,
  'number_of_ratings': 49947},
 {'course_id': 21,
  'course_name': 'Spring & Hibernate for Beginners (includes Spring Boot)',
  'suitable_for': 'Advanced',
  'enrollment': 177053,
  'stars': 4.6,
  'number_of_ratings': 45329},
 {'course_id': 7,
  'course_name': 'The Complete 2020 Web Development Bootcamp',
  'suitable_for': 'Beginner',
  'enrollment': 270656,
  'stars': 4.7,
  'number_of_ratings': 88098},
 {'course_id': 9,
  'course_name': 'The Complete JavaScript Course 2020: Build Real Projects!',
  'suitable_for': 'Intermediate',
  'enrollment': 347979,
  'stars': 4.6,
  'number_of_ratings': 83521},
 {'course_id': 16,
  'course_name': 'The Complete Node.js Developer Course (3rd Edition)',
  'suitable_for': 'Advanced',
  'enrollment': 202922,
  'stars': 4.7,
  'number_of_ratings': 50885},
 {'course_id': 13,
  'course_name': 'The Complete Web Developer Course 2.0',
  'suitable_for': 'Intermediate',
  'enrollment': 273598,
  'stars': 4.5,
  'number_of_ratings': 63175},
 {'course_id': 11,
  'course_name': 'The Data Science Course 2020: Complete Data Science Bootcamp',
  'suitable_for': 'Beginner',
  'enrollment': 325047,
  'stars': 4.5,
  'number_of_ratings': 76907},
 {'course_id': 20,
  'course_name': 'The Ultimate MySQL Bootcamp: Go from SQL Beginner to Expert',
  'suitable_for': 'Beginner',
  'enrollment': 203366,
  'stars': 4.6,
  'number_of_ratings': 45382},
 {'course_id': 2,
  'course_name': 'The Web Developer Bootcamp',
  'suitable_for': 'Beginner',
  'enrollment': 596726,
  'stars': 4.6,
  'number_of_ratings': 182997},
 {'course_id': 19,
  'course_name': 'Unreal Engine C++ Developer: Learn C++ and Make Video Games',
  'suitable_for': 'Advanced',
  'enrollment': 229005,
  'stars': 4.5,
  'number_of_ratings': 45860},
 {'course_id': 17,
  'course_name': 'iOS 13 & Swift 5 - The Complete iOS App Development Bootcamp',
  'suitable_for': 'Advanced',
  'enrollment': 179598,
  'stars': 4.8,
  'number_of_ratings': 49972}]

from pyspark.sql import Row
courses_df = spark.createDataFrame([Row(**course) for course in courses])

In [0]:
import getpass
username = getpass.getuser()

In [0]:
dbutils.fs.ls(f'/user/{username}/courses')

In [0]:
courses_df.columns

In [0]:
courses_df. \
    coalesce(1). \
    write. \
    csv(f'/user/{username}/courses', mode='overwrite', header=True)

In [0]:
dbutils.fs.ls(f'/user/{username}/courses')

In [0]:
spark.read.text(f'/user/{username}/courses').show(truncate=False)

In [0]:
spark.read.csv(f'/user/{username}/courses', header=True).show()

In [0]:
courses_df. \
    coalesce(1). \
    write. \
    format('csv'). \
    save(f'/user/{username}/courses', mode='overwrite', header=True)

In [0]:
dbutils.fs.ls(f'/user/{username}/courses')

In [0]:
spark.read.csv(f'/user/{username}/courses', header=True,,mode='overwrite').show()

#Spark SQL

In [0]:
help(spark.sql)

Help on method sql in module pyspark.sql.session:

sql(sqlQuery: str, **kwargs: Any) -> pyspark.sql.dataframe.DataFrame method of pyspark.sql.session.SparkSession instance
    Returns a :class:`DataFrame` representing the result of the given query.
    When ``kwargs`` is specified, this method formats the given string by using the Python
    standard formatter.
    
    .. versionadded:: 2.0.0
    
    Parameters
    ----------
    sqlQuery : str
        SQL query string.
    kwargs : dict
        Other variables that the user wants to set that can be referenced in the query
    
        .. versionchanged:: 3.3.0
           Added optional argument ``kwargs`` to specify the mapping of variables in the query.
           This feature is experimental and unstable.
    
    Returns
    -------
    :class:`DataFrame`
    
    Examples
    --------
    Executing a SQL query.
    
    >>> spark.sql("SELECT * FROM range(10) where id > 7").show()
    +---+
    | id|
    +---+
    |  8|
    |  9|

In [0]:
from pyspark.sql import Row

In [0]:
import datetime
users = [
    {
        "id": 1,
        "first_name": "Corrie",
        "last_name": "Van den Oord",
        "email": "cvandenoord0@etsy.com",
        "phone_numbers": Row(mobile="+1 234 567 8901", home="+1 234 567 8911"),
        "courses": [1, 2],
        "is_customer": True,
        "amount_paid": 1000.55,
        "customer_from": datetime.date(2021, 1, 15),
        "last_updated_ts": datetime.datetime(2021, 2, 10, 1, 15, 0)
    },
    {
        "id": 2,
        "first_name": "Nikolaus",
        "last_name": "Brewitt",
        "email": "nbrewitt1@dailymail.co.uk",
        "phone_numbers":  Row(mobile="+1 234 567 8923", home="1 234 567 8934"),
        "courses": [3],
        "is_customer": True,
        "amount_paid": 900.0,
        "customer_from": datetime.date(2021, 2, 14),
        "last_updated_ts": datetime.datetime(2021, 2, 18, 3, 33, 0)
    },
    {
        "id": 3,
        "first_name": "Orelie",
        "last_name": "Penney",
        "email": "openney2@vistaprint.com",
        "phone_numbers": Row(mobile="+1 714 512 9752", home="+1 714 512 6601"),
        "courses": [2, 4],
        "is_customer": True,
        "amount_paid": 850.55,
        "customer_from": datetime.date(2021, 1, 21),
        "last_updated_ts": datetime.datetime(2021, 3, 15, 15, 16, 55)
    },
    {
        "id": 4,
        "first_name": "Ashby",
        "last_name": "Maddocks",
        "email": "amaddocks3@home.pl",
        "phone_numbers": Row(mobile=None, home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 10, 17, 45, 30)
    },
    {
        "id": 5,
        "first_name": "Kurt",
        "last_name": "Rome",
        "email": "krome4@shutterfly.com",
        "phone_numbers": Row(mobile="+1 817 934 7142", home=None),
        "courses": [],
        "is_customer": False,
        "amount_paid": None,
        "customer_from": None,
        "last_updated_ts": datetime.datetime(2021, 4, 2, 0, 55, 28)
    }
]

In [0]:
df= spark.createDataFrame(users)

In [0]:
df.createOrReplaceTempView('users_View')

In [0]:
spark.sql('select * from users_view').show()

+-----------+-------+-------------+--------------------+----------+---+-----------+------------+-------------------+--------------------+
|amount_paid|courses|customer_from|               email|first_name| id|is_customer|   last_name|    last_updated_ts|       phone_numbers|
+-----------+-------+-------------+--------------------+----------+---+-----------+------------+-------------------+--------------------+
|    1000.55| [1, 2]|   2021-01-15|cvandenoord0@etsy...|    Corrie|  1|       true|Van den Oord|2021-02-10 01:15:00|{+1 234 567 8901,...|
|      900.0|    [3]|   2021-02-14|nbrewitt1@dailyma...|  Nikolaus|  2|       true|     Brewitt|2021-02-18 03:33:00|{+1 234 567 8923,...|
|     850.55| [2, 4]|   2021-01-21|openney2@vistapri...|    Orelie|  3|       true|      Penney|2021-03-15 15:16:55|{+1 714 512 9752,...|
|       null|     []|         null|  amaddocks3@home.pl|     Ashby|  4|      false|    Maddocks|2021-04-10 17:45:30|        {null, null}|
|       null|     []|         null

In [0]:
spark.sql('select * from users_view where  not Isnull(customer_from)').show()

+-----------+-------+-------------+--------------------+----------+---+-----------+------------+-------------------+--------------------+
|amount_paid|courses|customer_from|               email|first_name| id|is_customer|   last_name|    last_updated_ts|       phone_numbers|
+-----------+-------+-------------+--------------------+----------+---+-----------+------------+-------------------+--------------------+
|    1000.55| [1, 2]|   2021-01-15|cvandenoord0@etsy...|    Corrie|  1|       true|Van den Oord|2021-02-10 01:15:00|{+1 234 567 8901,...|
|      900.0|    [3]|   2021-02-14|nbrewitt1@dailyma...|  Nikolaus|  2|       true|     Brewitt|2021-02-18 03:33:00|{+1 234 567 8923,...|
|     850.55| [2, 4]|   2021-01-21|openney2@vistapri...|    Orelie|  3|       true|      Penney|2021-03-15 15:16:55|{+1 714 512 9752,...|
+-----------+-------+-------------+--------------------+----------+---+-----------+------------+-------------------+--------------------+



In [0]:
help(spark.udf.register)

Help on method register in module pyspark.sql.udf:

register(name: str, f: Union[Callable[..., Any], ForwardRef('UserDefinedFunctionLike')], returnType: Union[ForwardRef('DataTypeOrString'), NoneType] = None) -> 'UserDefinedFunctionLike' method of pyspark.sql.udf.UDFRegistration instance
    Register a Python function (including lambda function) or a user-defined function
    as a SQL function.
    
    .. versionadded:: 1.3.1
    
    Parameters
    ----------
    name : str,
        name of the user-defined function in SQL statements.
    f : function, :meth:`pyspark.sql.functions.udf` or :meth:`pyspark.sql.functions.pandas_udf`
        a Python function, or a user-defined function. The user-defined function can
        be either row-at-a-time or vectorized. See :meth:`pyspark.sql.functions.udf` and
        :meth:`pyspark.sql.functions.pandas_udf`.
    returnType : :class:`pyspark.sql.types.DataType` or str, optional
        the return type of the registered user-defined function. Th

In [0]:
dc = spark.udf.register('date_convert', lambda d: int(d[:10].replace('-', '')))

In [0]:
##choose a data

In [0]:
df.selectExpr('date_convert(order_date) AS order_date').show()

In [0]:
df.select(dc('order_date')).show()

In [0]:
df.createOrReplaceTempView('orders')

In [0]:
spark.sql('''
    SELECT o.*, date_convert(order_date) AS order_date_as_int
    FROM orders AS o
'''). \
    show()

In [0]:
spark.sql('''
    SELECT o.*, date_convert(order_date) AS order_date_as_int
    FROM orders AS o
    WHERE date_convert(order_date) = 20140101
'''). \
    show()

#Error Handler

In [0]:
import datetime
users = [
    {
        "id": '1',
        "first_name": "Corrie",
        "last_name": "Van den Oord"
    },
    {
        "id": '2',
        "first_name": "Nikolaus",
        "last_name": "Brewitt"
    },
    {
        "id": '3',
        "first_name": "Orelie",
        "last_name": "Penney"
    },
    {
        "id": '4a',
        "first_name": "Ashby",
        "last_name": "Maddocks"
    },
    {
        "id": 'c5',
        "first_name": "Kurt",
        "last_name": "Rome"
    }
]

In [0]:
spark.createDataFrame(users).show()

+----------+---+------------+
|first_name| id|   last_name|
+----------+---+------------+
|    Corrie|  1|Van den Oord|
|  Nikolaus|  2|     Brewitt|
|    Orelie|  3|      Penney|
|     Ashby| 4a|    Maddocks|
|      Kurt| c5|        Rome|
+----------+---+------------+



In [0]:
spark.createDataFrame(users,schema='id string, first_name string,last_name string').show()

+---+----------+------------+
| id|first_name|   last_name|
+---+----------+------------+
|  1|    Corrie|Van den Oord|
|  2|  Nikolaus|     Brewitt|
|  3|    Orelie|      Penney|
| 4a|     Ashby|    Maddocks|
| c5|      Kurt|        Rome|
+---+----------+------------+



In [0]:
spark.createDataFrame(users,schema='id int, first_name string,last_name string').show()

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
[0;32m<ipython-input-30-284a81c18d02>[0m in [0;36m<module>[0;34m[0m
[0;32m----> 1[0;31m [0mspark[0m[0;34m.[0m[0mcreateDataFrame[0m[0;34m([0m[0musers[0m[0;34m,[0m[0mschema[0m[0;34m=[0m[0;34m'id int, first_name string,last_name string'[0m[0;34m)[0m[0;34m.[0m[0mshow[0m[0;34m([0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0m
[0;32m/usr/local/lib/python3.7/dist-packages/pyspark/sql/session.py[0m in [0;36mcreateDataFrame[0;34m(self, data, schema, samplingRatio, verifySchema)[0m
[1;32m    893[0m             )
[1;32m    894[0m         return self._create_dataframe(
[0;32m--> 895[0;31m             [0mdata[0m[0;34m,[0m [0mschema[0m[0;34m,[0m [0msamplingRatio[0m[0;34m,[0m [0mverifySchema[0m  [0;31m# type: ignore[arg-type][0m[0;34m[0m[0;34m[0m[0m
[0m[1;32m    896

In [0]:
spark.createDataFrame(users,schema='id string, first_name string,last_name string').show()

[0;36m  File [0;32m"<ipython-input-31-0388f6f2db6e>"[0;36m, line [0;32m1[0m
[0;31m    spark.createDataFrame(users,schema='id string, first_name string,last_name string',mode).show()[0m
[0m                                                                                      ^[0m
[0;31mSyntaxError[0m[0;31m:[0m positional argument follows keyword argument
