# Intro
This is a short overview of core PySpak methods we usually use to work with data.

Full API docs are here:
* DataFrame methods — https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#dataframe-apis
* Columns — https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#dataframe-apis
* SQL-like functions — https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#functions
* Window functions — https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#window
* Grouping functions — https://spark.apache.org/docs/3.1.3/api/python/reference/pyspark.sql.html#grouping

# Imports

In [None]:
# Importing libraries needed to run pyspark code or form some data for examples.
import pyspark
from pyspark.sql import functions as f
from pyspark.sql import Window as win
import pandas as pd
import numpy as np
import datetime
import random



# Important principe to know

PySpark returns the new object after every operation applied. This means, that if you do not assign it to your old variable, then you would not have any changes there.

In [None]:
df = spark.createDataFrame(
  [{'id':1, 'val':1},
  {'id':1, 'val':2}]
)
df.show()

+---+---+
| id|val|
+---+---+
|  1|  1|
|  1|  2|
+---+---+



In [None]:
# If we apply any changes without assignment then they would not be saved.
df.withColumn('val_plus', df.val+1).show()

+---+---+--------+
| id|val|val_plus|
+---+---+--------+
|  1|  1|       2|
|  1|  2|       3|
+---+---+--------+



In [None]:
# Original object hasn't changed
df.show()

+---+---+
| id|val|
+---+---+
|  1|  1|
|  1|  2|
+---+---+



In [None]:
# Need to assign
df = df.withColumn('val_plus', df.val+1)
df.show()

+---+---+--------+
| id|val|val_plus|
+---+---+--------+
|  1|  1|       2|
|  1|  2|       3|
+---+---+--------+



# Basic examples

## Accessing the tables

In [None]:
df = spark.table('data.china_data')
df.show(5)

+----------+----------+------+---------+------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+----------+------+---------+------------+
|2020-02-01|MB One Web| 58897|      179|        null|
|2020-02-04|MB One Web| 79708|     1054|        null|
|2020-02-03|MB One Web| 80857|      656|        null|
|2020-02-02|MB One Web| 59425|      191|        null|
|2020-02-06|MB One Web| 67646|      727|        null|
+----------+----------+------+---------+------------+
only showing top 5 rows



## Filtering data

### How to filter

In [None]:
# Here are several possible approaches
# This one is the most flexible
df.filter( f.col('date') == datetime.datetime(2021,6,22) ).show(2)

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2021-06-22|MB Official WeChat|  null|      448|        null|
|2021-06-22|        MB One Web|141425|      309|        null|
+----------+------------------+------+---------+------------+



In [None]:
# But you also can call column object directly
df.filter( df.date == datetime.datetime(2021,6,22) ).show()

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2021-06-22|MB Official WeChat|  null|      448|        null|
|2021-06-22|        MB One Web|141425|      309|        null|
+----------+------------------+------+---------+------------+



In [None]:
# If column name has these symbols doesn't let you to refer it like it was in the previous examples, you can refer it like that:
df.filter( df['date'] == datetime.datetime(2021,6,22) ).show()

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2021-06-22|MB Official WeChat|  null|      448|        null|
|2021-06-22|        MB One Web|141425|      309|        null|
+----------+------------------+------+---------+------------+



In [None]:
# Be careful with filtering with ==, <=, >=, != logic. If you need to combine your filtering terms you need to keep this clause in ()
df.filter( 
  ( f.col('date') >= datetime.datetime(2021,6,22) )
  & ( f.col('app_name') == 'MB One Web'  )
).show(5)

+----------+----------+------+---------+------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+----------+------+---------+------------+
|2021-06-26|MB One Web| 67999|       31|        null|
|2021-07-10|MB One Web|107830|      614|        null|
|2021-08-15|MB One Web| 81713|      641|        null|
|2021-08-03|MB One Web|195690|      798|        null|
|2021-06-22|MB One Web|141425|      309|        null|
+----------+----------+------+---------+------------+
only showing top 5 rows



In [None]:
# In some simple cases simple string notation could be used:
df.filter(
  'date == "2021-06-22" or date == "2021-08-03"'
).show(5)

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2021-06-22|MB Official WeChat|  null|      448|        null|
|2021-06-22|        MB One Web|141425|      309|        null|
|2021-08-03|MB Official WeChat|  null|      198|        null|
|2021-08-03|        MB One Web|195690|      798|        null|
+----------+------------------+------+---------+------------+



### Check if column value is in the list

In [None]:
df.filter( f.col('app_name').isin('MB One Web', 'MB Official WeChat') ).show(5)

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2020-04-01|        MB One Web|235834|     1915|        null|
|2020-04-14|        MB One Web|231402|     1237|        null|
|2020-06-17|MB Official WeChat|  null|        1|        null|
|2020-06-17|        MB One Web|186732|     1129|        null|
|2020-04-15|        MB One Web|113462|     1280|        null|
+----------+------------------+------+---------+------------+
only showing top 5 rows



In [None]:
# We can also use list objects packed.
vals = ['MB One Web', 'MB Official WeChat']
df.filter( f.col('app_name').isin(vals) ).show(5)

+----------+------------------+------+---------+------------+
|      date|          app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+------------------+------+---------+------------+
|2020-04-01|        MB One Web|235834|     1915|        null|
|2020-04-14|        MB One Web|231402|     1237|        null|
|2020-06-17|MB Official WeChat|  null|        1|        null|
|2020-06-17|        MB One Web|186732|     1129|        null|
|2020-04-15|        MB One Web|113462|     1280|        null|
+----------+------------------+------+---------+------------+
only showing top 5 rows



In [None]:
# Note that you can't use other column from any DataFrame as list of values for .isin() method.
# Use 'leftsemi' join method instead. It is described below.

### Get opposite results

In [None]:
# You can flip any filtering term by adding '~'. For instance, this wold return all columns where value is not in list of values
df.filter( ~f.col('app_name').isin('MB One Web', 'MB Official WeChat') ).show(5)

+----------+-------------------+------+---------+------------+
|      date|           app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+-------------------+------+---------+------------+
|2020-04-01| Mercedes me WeChat|  null|        1|        null|
|2020-04-01|AMG Official WeChat|  null|        1|        null|
|2020-04-14| Mercedes me WeChat|  null|        1|        null|
|2020-04-14|AMG Official WeChat|  null|        1|        null|
|2020-06-17| Mercedes me WeChat|  null|        1|        null|
+----------+-------------------+------+---------+------------+
only showing top 5 rows



### Equality filters

In [None]:
# Keep in mind that some simple approaches here could bring you unexpected results.
df_eq = spark.createDataFrame([
    {'id': 1, 'value': 'test' },
    {'id': 2, 'value': None }
])
df_eq.show()
print('--------')
df_eq.filter( f.col('value') != 'test_not' ).show()
print('Take a notice, that id 2 is not presented, because null couln\'t be compared to other values')

+---+-----+
| id|value|
+---+-----+
|  1| test|
|  2| null|
+---+-----+

--------
+---+-----+
| id|value|
+---+-----+
|  1| test|
+---+-----+

Take a notice, that id 2 is not presented, because null couln't be compared to other values


In [None]:
# So correct filtering here would be
df_eq.filter( ~f.col('value').eqNullSafe('test_not') ).show()

+---+-----+
| id|value|
+---+-----+
|  1| test|
|  2| null|
+---+-----+



### Filtering null values

In [None]:
df.filter( f.col('uv_cnt').isNull() ).show(5)

+----------+-------------------+------+---------+------------+
|      date|           app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+-------------------+------+---------+------------+
|2020-04-01| Mercedes me WeChat|  null|        1|        null|
|2020-04-01|AMG Official WeChat|  null|        1|        null|
|2020-04-14| Mercedes me WeChat|  null|        1|        null|
|2020-04-14|AMG Official WeChat|  null|        1|        null|
|2020-06-17| Mercedes me WeChat|  null|        1|        null|
+----------+-------------------+------+---------+------------+
only showing top 5 rows



In [None]:
df.filter( f.col('uv_cnt').isNotNull() ).show(5)

+----------+----------+------+---------+------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+----------+------+---------+------------+
|2020-04-01|MB One Web|235834|     1915|        null|
|2020-04-14|MB One Web|231402|     1237|        null|
|2020-06-17|MB One Web|186732|     1129|        null|
|2020-04-15|MB One Web|113462|     1280|        null|
|2020-04-17|MB One Web|115934|     1902|        null|
+----------+----------+------+---------+------------+
only showing top 5 rows



### Filtering strings by regexp

In [None]:
df.filter( f.col('app_name').rlike('WeChat|Web') ).show(5)

+----------+-------------------+------+---------+------------+
|      date|           app_name|uv_cnt|leads_cnt|dcp_turnover|
+----------+-------------------+------+---------+------------+
|2020-04-01| Mercedes me WeChat|  null|        1|        null|
|2020-04-01|AMG Official WeChat|  null|        1|        null|
|2020-04-01|         MB One Web|235834|     1915|        null|
|2020-04-14| Mercedes me WeChat|  null|        1|        null|
|2020-04-14|AMG Official WeChat|  null|        1|        null|
+----------+-------------------+------+---------+------------+
only showing top 5 rows



## Select and modify the data

### Select

In [None]:
# Sometimes you need to select some certain columns from dataframe.
df.select( 'date', 'app_name' ).show(5)

+----------+----------+
|      date|  app_name|
+----------+----------+
|2020-02-01|MB One Web|
|2020-02-04|MB One Web|
|2020-02-03|MB One Web|
|2020-02-02|MB One Web|
|2020-02-06|MB One Web|
+----------+----------+
only showing top 5 rows



In [None]:
# To select disticnt values apply .distinct()
df.select( 'date', 'app_name' ).distinct().show(5)

+----------+-------------------+
|      date|           app_name|
+----------+-------------------+
|2020-04-01| Mercedes me WeChat|
|2020-04-14|         MB One Web|
|2020-04-01|AMG Official WeChat|
|2020-04-14| Mercedes me WeChat|
|2020-04-01|         MB One Web|
+----------+-------------------+
only showing top 5 rows



### Add new columns

In [None]:
df.select(
  '*',
  (f.col('uv_cnt').isNotNull() & (f.col('uv_cnt') > 0)).alias('positive'),
  (f.col('leads_cnt')/1000).alias('thousands_leads')
).show(10)

+----------+----------+------+---------+------------+--------+---------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|positive|thousands_leads|
+----------+----------+------+---------+------------+--------+---------------+
|2020-02-01|MB One Web| 58897|      179|        null|    true|          0.179|
|2020-02-04|MB One Web| 79708|     1054|        null|    true|          1.054|
|2020-02-03|MB One Web| 80857|      656|        null|    true|          0.656|
|2020-02-09|MB One Web| 63857|     1437|        null|    true|          1.437|
|2020-02-08|MB One Web| 66596|     1659|        null|    true|          1.659|
|2020-02-02|MB One Web| 59425|      191|        null|    true|          0.191|
|2020-02-10|MB One Web|111734|     2106|        null|    true|          2.106|
|2020-02-06|MB One Web| 67646|      727|        null|    true|          0.727|
|2020-02-07|MB One Web| 75610|     1625|        null|    true|          1.625|
|2020-02-11|MB One Web|151158|     2284|        null

In [None]:
# Or you can use special functuion to add new column
df.withColumn('thousands_leads', f.col('leads_cnt')/1000).show(5)

+----------+----------+------+---------+------------+---------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|thousands_leads|
+----------+----------+------+---------+------------+---------------+
|2020-02-01|MB One Web| 58897|      179|        null|          0.179|
|2020-02-04|MB One Web| 79708|     1054|        null|          1.054|
|2020-02-03|MB One Web| 80857|      656|        null|          0.656|
|2020-02-02|MB One Web| 59425|      191|        null|          0.191|
|2020-02-06|MB One Web| 67646|      727|        null|          0.727|
+----------+----------+------+---------+------------+---------------+
only showing top 5 rows



### Case-When analogue

In [None]:
df.select(
  '*',
  f.when( f.dayofweek('date') == 7, 'Don\'t work, please' )\
  .when( f.dayofweek('date') == 6, 'Ready to relax?' )\
  .otherwise('Do not stop').alias('exmpl')
)\
.sort('date', ascending=True)\
.show(10)

+----------+----------+------+---------+------------+------------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|             exmpl|
+----------+----------+------+---------+------------+------------------+
|2020-02-01|MB One Web| 58897|      179|        null|Don't work, please|
|2020-02-02|MB One Web| 59425|      191|        null|       Do not stop|
|2020-02-03|MB One Web| 80857|      656|        null|       Do not stop|
|2020-02-04|MB One Web| 79708|     1054|        null|       Do not stop|
|2020-02-05|MB One Web| 90161|     1289|        null|       Do not stop|
|2020-02-06|MB One Web| 67646|      727|        null|       Do not stop|
|2020-02-07|MB One Web| 75610|     1625|        null|   Ready to relax?|
|2020-02-08|MB One Web| 66596|     1659|        null|Don't work, please|
|2020-02-09|MB One Web| 63857|     1437|        null|       Do not stop|
|2020-02-10|MB One Web|111734|     2106|        null|       Do not stop|
+----------+----------+------+---------+-----------

### Delete columns

In [None]:
df.drop('leads_cnt', 'dcp_turnover').show()

+----------+------------------+------+
|      date|          app_name|uv_cnt|
+----------+------------------+------+
|2020-02-20|        MB One Web|191875|
|2020-02-01|        MB One Web| 58897|
|2020-02-04|        MB One Web| 79708|
|2020-02-19|        MB One Web|138781|
|2020-02-12|        MB One Web|121168|
|2020-02-03|        MB One Web| 80857|
|2020-02-09|        MB One Web| 63857|
|2020-02-08|        MB One Web| 66596|
|2020-02-02|        MB One Web| 59425|
|2020-02-10|        MB One Web|111734|
|2020-02-06|        MB One Web| 67646|
|2020-02-07|        MB One Web| 75610|
|2020-02-11|        MB One Web|151158|
|2020-02-18|        MB One Web| 96088|
|2020-02-13|        MB One Web|125912|
|2020-02-14|        MB One Web| 91866|
|2020-02-16|        MB One Web| 77041|
|2020-02-05|        MB One Web| 90161|
|2020-02-15|        MB One Web| 70624|
|2020-02-17|Mercedes me WeChat|  null|
+----------+------------------+------+
only showing top 20 rows



### Replace values
Full description: [link](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.replace.html)

In [None]:
#  Creating dataframe with dummy data
df = spark.createDataFrame(
  [{'id':x,'key':random.choice(['test', 'is', 'awesome']), 'some_string_value':random.choice(['pyspark', 'is', 'awesome', 'to', 'test'])} for x in range(10)]
)
print('Dataframe before replacing values')
display(df)

# Replacing value 'test' to 'MB.io' in any column of our dataframe
print('Dataframe after replacing value in every column')
display(df.replace( to_replace='test', value='MB.io' ))

Dataframe before replacing values


id,key,some_string_value
0,awesome,is
1,awesome,pyspark
2,is,test
3,awesome,test
4,is,test
5,awesome,pyspark
6,test,awesome
7,test,pyspark
8,is,is
9,test,is


Dataframe after replacing value in every column


id,key,some_string_value
0,awesome,is
1,awesome,pyspark
2,is,MB.io
3,awesome,MB.io
4,is,MB.io
5,awesome,pyspark
6,MB.io,awesome
7,MB.io,pyspark
8,is,is
9,MB.io,is


In [None]:
# We can define in which columns we need to replace values
display(df.replace('test', 'MB.io', subset=['key']))

id,key,some_string_value
0,awesome,is
1,awesome,pyspark
2,is,test
3,awesome,test
4,is,test
5,awesome,pyspark
6,MB.io,awesome
7,MB.io,pyspark
8,is,is
9,MB.io,is


In [None]:
# And we can replace several values at one time
print('Dataframe before replacing values')
display(df)
print('After replacement')
display(
  df.replace(['test', 'pyspark'], ['work', 'MB.io'], ['some_string_value'])
)

Dataframe before replacing values


id,key,some_string_value
0,awesome,is
1,awesome,pyspark
2,is,test
3,awesome,test
4,is,test
5,awesome,pyspark
6,test,awesome
7,test,pyspark
8,is,is
9,test,is


After replacement


id,key,some_string_value
0,awesome,is
1,awesome,MB.io
2,is,work
3,awesome,work
4,is,work
5,awesome,MB.io
6,test,awesome
7,test,MB.io
8,is,is
9,test,is


### Replace part of string values
Full description: [link](https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.regexp_replace.html)

In [None]:
#  Creating dataframe with dummy data
df = spark.createDataFrame(
  [{'id':x,'key':random.choice(['test', 'is', 'awesome']), 'some_string_value':random.choice(['pyspark', 'is', 'awesome', 'to', 'test'])} for x in range(10)]
)
print('Dataframe before replacing values')
display(df)

# Replacing value 'spark' to 'thon' in column "some_string_value" of our dataframe
print('Dataframe after replacement')
df.withColumn('some_string_value', f.regexp_replace('some_string_value', 'spark$', 'thon')).display()

Dataframe before replacing values


id,key,some_string_value
0,is,awesome
1,test,pyspark
2,is,test
3,test,test
4,test,pyspark
5,test,awesome
6,awesome,pyspark
7,test,pyspark
8,test,to
9,is,pyspark


Dataframe after replacement


id,key,some_string_value
0,is,awesome
1,test,python
2,is,test
3,test,test
4,test,python
5,test,awesome
6,awesome,python
7,test,python
8,test,to
9,is,python


### Fill null values
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.fillna.html

In [None]:
#  Creating dataframe with dummy data
df = spark.createDataFrame(
  [{'id':x,'key':random.choice(['test', 'is', 'awesome', None, None]), 'some_int_value':random.choice([2, 1, 5, None, None])} for x in range(10)]
)
print('Dataframe before replacing values')
display(df)

Dataframe before replacing values


id,key,some_int_value
0,,
1,is,
2,awesome,5.0
3,,
4,is,2.0
5,,5.0
6,,1.0
7,is,5.0
8,,
9,,2.0


In [None]:
# Replacing all null values with "n/a"
df.fillna('n/a').display()

id,key,some_int_value
0,,
1,is,
2,awesome,5.0
3,,
4,is,2.0
5,,5.0
6,,1.0
7,is,5.0
8,,
9,,2.0


**ATTENTION**: values in the column some_int_value were not filled, because this column contains integer values and we are trying to pass string there.<br><br>
For such cases use mapping:

In [None]:
# Replacing null values in the specific column(s)
df.fillna({'key':'n/a', 'some_int_value': 0}).display()

id,key,some_int_value
0,,0
1,is,0
2,awesome,5
3,,0
4,is,2
5,,5
6,,1
7,is,5
8,,0
9,,2


### Concatenate values

In [None]:
#  Creating dataframe with dummy data
df = spark.createDataFrame(
  [
  {
    'id':x,
    'key':random.choice(['test', 'is', 'awesome', None, None]),
    'query':random.choice(['one_query', 'two_query', 'three_query'])
  } 
  for x in range(10)
  ]
)
print('Dataframe before concatenating values')
display(df)

Dataframe before concatenating values


id,key,query
0,,two_query
1,,one_query
2,is,three_query
3,awesome,three_query
4,,three_query
5,test,one_query
6,,one_query
7,awesome,one_query
8,,one_query
9,,three_query


f.concat() will concatenate listed columns.<br>
f.concat_ws() will do the same, but with a given separator.

In [None]:
(
  df
  .withColumn('f_concat', f.concat('id', 'key', 'query'))
  .withColumn('f_concat_ws', f.concat_ws('||', 'id', 'key', 'query'))
).display()

id,key,query,f_concat,f_concat_ws
0,,two_query,,0||two_query
1,,one_query,,1||one_query
2,is,three_query,2isthree_query,2||is||three_query
3,awesome,three_query,3awesomethree_query,3||awesome||three_query
4,,three_query,,4||three_query
5,test,one_query,5testone_query,5||test||one_query
6,,one_query,,6||one_query
7,awesome,one_query,7awesomeone_query,7||awesome||one_query
8,,one_query,,8||one_query
9,,three_query,,9||three_query


**Notice how differently these two functions work with null values:**<br> f.concat() will return null if any of the columns is null, while f.concat_ws() will skip the null value and join the remaining columns.

## Join dataframes
Here you can find full description:
https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.join.html

In [None]:
np.random.seed(123)
# Let's define some dummy data first.
df1 = spark.createDataFrame(
  [{'id':x,'key':random.choice('abcde'), 'value':np.random.randint(1000)} for x in range(10)]
)

df2 = spark.createDataFrame(
  [{'id':x,'key':random.choice('defg'), 'value':np.random.randint(1000)} for x in range(10)]
)

In [None]:
print('This is df1:')
print(df1.show())

This is df1:
+---+---+-----+
| id|key|value|
+---+---+-----+
|  0|  c|  510|
|  1|  b|  365|
|  2|  c|  382|
|  3|  b|  322|
|  4|  d|  988|
|  5|  a|   98|
|  6|  a|  742|
|  7|  b|   17|
|  8|  c|  595|
|  9|  a|  106|
+---+---+-----+

None


In [None]:
print('This is df2:')
print(df2.show())

This is df2:
+---+---+-----+
| id|key|value|
+---+---+-----+
|  0|  g|  123|
|  1|  e|  569|
|  2|  f|  214|
|  3|  g|  737|
|  4|  e|   96|
|  5|  g|  113|
|  6|  d|  638|
|  7|  e|   47|
|  8|  d|   73|
|  9|  d|  544|
+---+---+-----+

None


### Left / right join

In [None]:
df1.join(df2, 'key', 'left').show()

+---+---+-----+----+-----+
|key| id|value|  id|value|
+---+---+-----+----+-----+
|  c|  0|  510|null| null|
|  b|  1|  365|null| null|
|  c|  2|  382|null| null|
|  d|  4|  988|   9|  544|
|  d|  4|  988|   8|   73|
|  d|  4|  988|   6|  638|
|  b|  3|  322|null| null|
|  a|  5|   98|null| null|
|  a|  6|  742|null| null|
|  b|  7|   17|null| null|
|  c|  8|  595|null| null|
|  a|  9|  106|null| null|
+---+---+-----+----+-----+



In [None]:
# As you can see in the previous cell, those columns, which were not used as keys, but have the same name stays with it after join.
# Here is how you can rename df2 columns after join if needed
df1.join(df2, 'key', 'left')\
.select(
  df1.key,
  df1.id,
  df1.value,
  df2.id.alias('id_df2'),
  df2.value.alias('value_df2')
)\
.show()

+---+---+-----+------+---------+
|key| id|value|id_df2|value_df2|
+---+---+-----+------+---------+
|  c|  0|  510|  null|     null|
|  b|  1|  365|  null|     null|
|  c|  2|  382|  null|     null|
|  d|  4|  988|     9|      544|
|  d|  4|  988|     8|       73|
|  d|  4|  988|     6|      638|
|  b|  3|  322|  null|     null|
|  a|  5|   98|  null|     null|
|  a|  6|  742|  null|     null|
|  b|  7|   17|  null|     null|
|  c|  8|  595|  null|     null|
|  a|  9|  106|  null|     null|
+---+---+-----+------+---------+



In [None]:
# Or you can pass modified df2 before join
df1.join(
  df2.select( 'key', df2.id.alias('id_df2'), df2.value.alias('value_df2') ),
  'key',
  'left'
).show()

+---+---+-----+------+---------+
|key| id|value|id_df2|value_df2|
+---+---+-----+------+---------+
|  c|  0|  510|  null|     null|
|  b|  1|  365|  null|     null|
|  c|  2|  382|  null|     null|
|  d|  4|  988|     9|      544|
|  d|  4|  988|     8|       73|
|  d|  4|  988|     6|      638|
|  b|  3|  322|  null|     null|
|  a|  5|   98|  null|     null|
|  a|  6|  742|  null|     null|
|  b|  7|   17|  null|     null|
|  c|  8|  595|  null|     null|
|  a|  9|  106|  null|     null|
+---+---+-----+------+---------+



In [None]:
# Sometimes there could be so much columns in the df so it will be more handy to use list comprehension methods rather then name every column
df1.join(
  df2.select( 'key', *[df2[x].alias(x+'_df2') for x in df2.columns if x != 'key'] ),
  'key',
  'left'
).show()

+---+---+-----+------+---------+
|key| id|value|id_df2|value_df2|
+---+---+-----+------+---------+
|  c|  0|  510|  null|     null|
|  b|  1|  365|  null|     null|
|  c|  2|  382|  null|     null|
|  d|  4|  988|     9|      544|
|  d|  4|  988|     8|       73|
|  d|  4|  988|     6|      638|
|  b|  3|  322|  null|     null|
|  a|  5|   98|  null|     null|
|  a|  6|  742|  null|     null|
|  b|  7|   17|  null|     null|
|  c|  8|  595|  null|     null|
|  a|  9|  106|  null|     null|
+---+---+-----+------+---------+



In [None]:
# Right join
df1.join(
  df2.select( 'key', df2.id.alias('id_df2'), df2.value.alias('value_df2') ),
  'key',
  'right'
).show()

+---+----+-----+------+---------+
|key|  id|value|id_df2|value_df2|
+---+----+-----+------+---------+
|  g|null| null|     0|      123|
|  e|null| null|     1|      569|
|  f|null| null|     2|      214|
|  g|null| null|     3|      737|
|  e|null| null|     4|       96|
|  g|null| null|     5|      113|
|  d|   4|  988|     6|      638|
|  e|null| null|     7|       47|
|  d|   4|  988|     8|       73|
|  d|   4|  988|     9|      544|
+---+----+-----+------+---------+



### Leftsemi and  Leftanti joins (excluding joins)
These types do not assign new values but filter values that do not match (leftsemi) or match (leftanti) joining criterias.

In [None]:
# This will leave only those values from df1 which has keys that are not represented in df2
df1.join(df2, 'key', 'leftanti').show()

+---+---+-----+
|key| id|value|
+---+---+-----+
|  c|  0|  510|
|  b|  1|  365|
|  c|  2|  382|
|  b|  3|  322|
|  a|  5|   98|
|  a|  6|  742|
|  b|  7|   17|
|  c|  8|  595|
|  a|  9|  106|
+---+---+-----+



In [None]:
# This will those that are presented
df1.join(df2, 'key', 'leftsemi').show()

+---+---+-----+
|key| id|value|
+---+---+-----+
|  d|  4|  988|
+---+---+-----+



### Inner
Ordinary inner join, nothing special

In [None]:
df1.join(df2,'key','inner').show()

+---+---+-----+---+-----+
|key| id|value| id|value|
+---+---+-----+---+-----+
|  d|  4|  988|  6|  638|
|  d|  4|  988|  8|   73|
|  d|  4|  988|  9|  544|
+---+---+-----+---+-----+



### Join with several conditions
Let's say you need to do the join operation with nore advanced conditions of joining.

In [None]:
# Joining by matching several keys
df1.join(df2, ['key', 'id'], 'left').show()

+---+---+-----+-----+
|key| id|value|value|
+---+---+-----+-----+
|  c|  0|  510| null|
|  b|  1|  365| null|
|  c|  2|  382| null|
|  b|  3|  322| null|
|  d|  4|  988| null|
|  a|  5|   98| null|
|  a|  6|  742| null|
|  b|  7|   17| null|
|  c|  8|  595| null|
|  a|  9|  106| null|
+---+---+-----+-----+



In [None]:
# Sometimes you might need to use some more comlicated terms
df1.join(df2, (df1.key == df2.key) & (df1.value <= df2.value), 'left').show()

+---+---+-----+----+----+-----+
| id|key|value|  id| key|value|
+---+---+-----+----+----+-----+
|  0|  c|  510|null|null| null|
|  1|  b|  365|null|null| null|
|  2|  c|  382|null|null| null|
|  4|  d|  988|null|null| null|
|  3|  b|  322|null|null| null|
|  5|  a|   98|null|null| null|
|  6|  a|  742|null|null| null|
|  7|  b|   17|null|null| null|
|  8|  c|  595|null|null| null|
|  9|  a|  106|null|null| null|
+---+---+-----+----+----+-----+



In [None]:
# Take a notice that if you use such complicated join terms then columns wouldn't be merged by keys 
# and you  will always has columns from df1 and df2 separately.

In [None]:
# You can also meet such type of approach, which is identical to the previous one
df1.join(df2, [df1.key == df2.key, df1.value <= df2.value], 'left').show()

+---+---+-----+----+----+-----+
| id|key|value|  id| key|value|
+---+---+-----+----+----+-----+
|  0|  c|  510|null|null| null|
|  1|  b|  365|null|null| null|
|  2|  c|  382|null|null| null|
|  4|  d|  988|null|null| null|
|  3|  b|  322|null|null| null|
|  5|  a|   98|null|null| null|
|  6|  a|  742|null|null| null|
|  7|  b|   17|null|null| null|
|  8|  c|  595|null|null| null|
|  9|  a|  106|null|null| null|
+---+---+-----+----+----+-----+



## Grouping and calculating aggregation functions

In [None]:
df1.groupby('key')\
.agg(
  f.sum('value').alias('sum_value'), 
  f.percentile_approx('value', 0.5, accuracy=10000).alias('median_value'),
  f.count_distinct('value').alias('unique_values')
).show()

+---+---------+------------+-------------+
|key|sum_value|median_value|unique_values|
+---+---------+------------+-------------+
|  d|      988|         988|            1|
|  c|     1487|         510|            3|
|  b|      704|         322|            3|
|  a|      946|         106|            3|
+---+---------+------------+-------------+



In [None]:
# For grouping, of course, you can use several keys as well
df1.groupby( 'key', 'id' ).agg( f.sum('value').alias('sum_value') ).show()

+---+---+---------+
|key| id|sum_value|
+---+---+---------+
|  c|  0|      510|
|  b|  1|      365|
|  c|  2|      382|
|  b|  3|      322|
|  d|  4|      988|
|  a|  5|       98|
|  a|  6|      742|
|  b|  7|       17|
|  a|  9|      106|
|  c|  8|      595|
+---+---+---------+



In [None]:
# Agrregations could be applied to the whole dataset as well, without grouping
df1.agg(f.sum('value').alias('sum_value')).show()

+---------+
|sum_value|
+---------+
|     4125|
+---------+



In [None]:
# You can also make a fast calculation over common aggregations
df1.describe('value').show()

+-------+-----------------+
|summary|            value|
+-------+-----------------+
|  count|               10|
|   mean|            412.5|
| stddev|306.0923063391173|
|    min|               17|
|    max|              988|
+-------+-----------------+



## Window functions

All the functions we usually use in traditional SQL could be calculated with PySpark as well.
You can find the list here — https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html

In [None]:
# Several examples
df1.select(
  '*',
  # Running total   
  f.sum('value').over(win.partitionBy().orderBy('id')).alias('running_total'),
  # Calculating the sum of current row and previous one
  f.sum('value').over(win.rowsBetween(-1, win.currentRow).orderBy('id')).alias('sum_current_and_previous'),
  # Previous value with "key" as partition
  f.lag('value').over(win.partitionBy('key').orderBy('id')).alias('previous_value_by_key'),
  # Last not null value
  f.last('value', ignorenulls=True).over(win.partitionBy('key').orderBy('id')).alias('last_not_null_value_by_key'),
  # Next value by key
  f.lead('value').over(win.partitionBy('key').orderBy('id')).alias('next_value_by_key'),
  # Collecting the set of unique values by key
  f.collect_set('value').over(win.partitionBy('key')).alias('all_vals_by_key')
).show()

+---+---+-----+-------------+------------------------+---------------------+--------------------------+-----------------+---------------+
| id|key|value|running_total|sum_current_and_previous|previous_value_by_key|last_not_null_value_by_key|next_value_by_key|all_vals_by_key|
+---+---+-----+-------------+------------------------+---------------------+--------------------------+-----------------+---------------+
|  0|  c|  510|          510|                     510|                 null|                       510|              382|[510, 595, 382]|
|  1|  b|  365|          875|                     875|                 null|                       365|              322| [322, 17, 365]|
|  2|  c|  382|         1257|                     747|                  510|                       382|              595|[510, 595, 382]|
|  3|  b|  322|         1579|                     704|                  365|                       322|               17| [322, 17, 365]|
|  4|  d|  988|         2567|     

In [None]:
# A bit more
df1.select(
  '*',
  # Calculating amount of unique values by key
  f.size(f.collect_set('value').over(win.partitionBy('key'))).alias('amount_of_vals_by_key'),
  # Mean value for the whole dataset within partition
  f.mean('value').over(win.partitionBy('key').rangeBetween(win.unboundedPreceding, win.unboundedFollowing)).alias('avg_by_key')
).show()

+---+---+-----+---------------------+------------------+
| id|key|value|amount_of_vals_by_key|        avg_by_key|
+---+---+-----+---------------------+------------------+
|  5|  a|   98|                    3| 315.3333333333333|
|  6|  a|  742|                    3| 315.3333333333333|
|  9|  a|  106|                    3| 315.3333333333333|
|  1|  b|  365|                    3|234.66666666666666|
|  3|  b|  322|                    3|234.66666666666666|
|  7|  b|   17|                    3|234.66666666666666|
|  0|  c|  510|                    3| 495.6666666666667|
|  2|  c|  382|                    3| 495.6666666666667|
|  8|  c|  595|                    3| 495.6666666666667|
|  4|  d|  988|                    1|             988.0|
+---+---+-----+---------------------+------------------+



## Limit the data

In [None]:
# If you do not need to use the whole data you can limit it
df.limit(10).show()

+---+-------+-----------+
| id|    key|      query|
+---+-------+-----------+
|  0|   null|  two_query|
|  1|   null|  one_query|
|  2|     is|three_query|
|  3|awesome|three_query|
|  4|   null|three_query|
|  5|   test|  one_query|
|  6|   null|  one_query|
|  7|awesome|  one_query|
|  8|   null|  one_query|
|  9|   null|three_query|
+---+-------+-----------+



In [None]:
# Or you can take a random sample from the data. It is usefull when you develop any logic and need to run tests.

# How much data do we need to extract. 0.1 means we will use 10% of the whole dataset
sample_fraction = 0.1
# Fixed seed is an optional parameter. If the value is set you will get the same rows every time you execute extraction of sample. Otherwise it will be always different. You might need to set it for tests.
fixed_seed = 123

df.sample(
  # Set sample_fraction False if you do not want to extract the same row twice or more   
  withReplacement=False,
  fraction=sample_fraction,
  seed=fixed_seed
).show()

+---+----+-----------+
| id| key|      query|
+---+----+-----------+
|  4|null|three_query|
|  8|null|  one_query|
+---+----+-----------+



## Display the data

In [None]:
# You can use not only .show() command to display the data, but display() as well.
# It provides some additional functionality
display( df )

id,key,query
0,,two_query
1,,one_query
2,is,three_query
3,awesome,three_query
4,,three_query
5,test,one_query
6,,one_query
7,awesome,one_query
8,,one_query
9,,three_query


## Save, append and overwrite tables
Any PySpark DataFrame can be saved as a static table.

In [None]:
df = spark.createDataFrame(
  [{'id':x,'key':random.choice('abcde'), 'value':np.random.randint(1000)} for x in range(10)]
)

df.write.saveAsTable(
  # Obrigatory to specify:
  # Table name  
  name='analystsdb.ac_basics_of_spark_example',
  # Path to save files  
  path='/mnt/analystsdb/test_table/ac_basics_of_spark_example',
  # Optional parameters:
  # 'append' to add data or 'overwrite' to overwrite 
  mode='overwrite',
  # False if no need to overwrite schema. Need to have it in case schema was changed.   
  overwriteSchema=True,
  # Chose column if partitioning needed   
  partitionBy='id'
)

spark.table('analystsdb.ac_basics_of_spark_example').show()

+---+---+-----+
| id|key|value|
+---+---+-----+
|  1|  c|  224|
|  4|  d|  339|
|  8|  d|  608|
|  2|  b|  111|
|  6|  c|  253|
|  7|  b|  420|
|  9|  c|  208|
|  5|  b|  846|
|  0|  c|  942|
|  3|  c|  409|
+---+---+-----+



## Create temporary view
You can create temporary view which then could be used in SQL query

In [None]:
df = spark.createDataFrame(
  [{'id':x,'key':random.choice('abcde'), 'value':np.random.randint(1000)} for x in range(10)]
)

df.createOrReplaceTempView('df_view')

In [None]:
%sql
-- Now you can use it in a sql query like an ordinary data source.
SELECT
  *
FROM
  df_view

id,key,value
0,e,68
1,b,817
2,e,823
3,a,451
4,a,2
5,e,340
6,b,39
7,a,322
8,b,596
9,e,559


## Working with secrets

Some workflows or notebooks may require credentials or tokens in order to be run. **These should never be kept in plaintext in the code**, since they need to be kept in a secure location. For this, Databricks has secrets (https://docs.databricks.com/security/secrets/), which can be managed with the right authorization. 

Secrets are key-value pairs that can be added/changed/removed inside their appropriate Scope: **awareness-consideration**

To add a secret, you can either use the databricks CLI (as seen in the documentation, if you are allowed to generate tokens or are authorized manage the secret scope), or ask a Data Engineer.

To access the secret inside a notebook/job, you can use the ``dbutils`` utility inside Databricks. You simply have to do the command `secret = dbutils.secrets.get(scope="awareness-consideration", key="<KEY-NAME>")`.

## Using widgets
Widgets allow us to set a variable in the interface and change it easily.

In [None]:
"""
This code is meant to handle a situation where the variable 'initial_load' might not be previously set in the DBUtils widgets. If it's not set, it's creating that widget with a default value of '1'. If it is set, it's just printing the current value of the 'initial_load'.
"""

# Try-Except block to handle potential errors
try:
  # Attempt to get the value of 'initial_load' from the widgets
  initial_load = dbutils.widgets.get('initial_load')

  # If successful, print the current state of 'initial_load'
  print('Initial load state is — {}'.format(str(initial_load)))

# If the above operations in the 'try' block throw an error, execute the following
except:

  # Set the 'initial_load' widget's value to '1' if it does not already exist
  dbutils.widgets.text('initial_load', '1')

  # Set the variable 'initial_load' to '1'
  initial_load = '1'

  # Print the current state of 'initial_load'
  print('Initial load state is — {}'.format(initial_load))

Initial load state is — 1


# Additional tricks
Some additional staff good to know.

## Using alias to shorten namings

In [None]:
# Let's define some dummy data first.
df_with_a_very_long_name1 = spark.createDataFrame(
  [{'id':x,'key':random.choice('abcde'), 'value':np.random.randint(1000)} for x in range(10)]
)

df_with_a_very_long_name2= spark.createDataFrame(
  [{'id':x,'key':random.choice('defg'), 'value':np.random.randint(1000)} for x in range(10)]
)

In [None]:
df_with_a_very_long_name1.alias('l')\
.join(
  df_with_a_very_long_name2.alias('r'),
  f.col('l.key') == f.col('r.key'),
  'left'
).show()

+---+---+-----+----+----+-----+
| id|key|value|  id| key|value|
+---+---+-----+----+----+-----+
|  0|  b|  504|null|null| null|
|  1|  a|  957|null|null| null|
|  2|  a|  176|null|null| null|
|  3|  a|  135|null|null| null|
|  4|  e|  873|   5|   e|  290|
|  4|  e|  873|   0|   e|  865|
|  5|  c|   99|null|null| null|
|  6|  c|  380|null|null| null|
|  7|  c|  860|null|null| null|
|  8|  d|  180|   1|   d|  213|
|  9|  c|  358|null|null| null|
+---+---+-----+----+----+-----+



In [None]:
# This also becomes valuable if you want to join data, but you want to do it on the fly 
# without assigning any new variable to one of the dataframes
df1 = spark.createDataFrame(
  [{'id':x,'key':random.choice('abcde'), 'value':np.random.randint(1000)} for x in range(10)]
)
df2 = spark.createDataFrame(
  [{'id':x,'key':random.choice('defg'), 'value':np.random.randint(1000)} for x in range(10)]
)

df1.filter(df1.id > 2)\
.select('*', (df1.key>'a').alias('new_col'))\
.alias('d')\
.join(df2, [df2.key==df1.key, f.col('d.new_col')], 'inner')\
.show()

+---+---+-----+-------+---+---+-----+
| id|key|value|new_col| id|key|value|
+---+---+-----+-------+---+---+-----+
|  3|  d|  843|   true|  3|  d|  765|
|  4|  e|  418|   true|  0|  e|  484|
|  4|  e|  418|   true|  1|  e|  271|
|  4|  e|  418|   true|  6|  e|  582|
|  4|  e|  418|   true|  8|  e|  371|
|  8|  e|  717|   true|  0|  e|  484|
|  8|  e|  717|   true|  1|  e|  271|
|  8|  e|  717|   true|  6|  e|  582|
|  8|  e|  717|   true|  8|  e|  371|
+---+---+-----+-------+---+---+-----+



## Getting the first not null value and propagating it
When considering the approach of using window functions to propagate first non-null values compared to calculating these values in a separate DataFrame and then joining, we can outline several pros and cons:
### Using Window Functions
**Pros:**
1. **Conciseness**: The process is more streamlined and you don't need to create a separate DataFrame and then perform a join operation.
2. **Performance**: It can potentially be faster for large datasets as Spark can leverage partitioning.
3. **Atomicity**: The entire process can be achieved within a single step and SQL-like logic, reducing the potential for errors during transformation.

**Cons:**
1. **Complexity**: Window functions can be more difficult to understand and debug, especially for those not familiar with SQL-like operations.
2. **Resource Consumption**: The use of window functions can potentially lead to high memory usage, as the operation might require shuffling and holding data into memory.

### Calculating First Non-Null Values in a Separate DataFrame and Then Joining
**Pros:**
1. **Simplicity**: This approach can be easier to understand and debug, especially for those new to Spark or SQL-like operations.
2. **Modularity**: It allows to separate the tasks into individual steps, which can be easier to manage, test, and reuse.
3. **Lower Memory Usage**: Since it does not involve complex operations like window functions, it can be less memory-intensive.

**Cons:**
1. **Performance**: It can be slower for larger datasets due to the overhead of creating a new DataFrame and joining it back.
2. **More Steps**: It involves more transformation steps, which could lead to more potential points of failure.

In the end, the best approach depends on the specific use case, the size and distribution of the data, and the computational resources available.

In [None]:
# Generate dummy data
data = [("1", None), ("1", None), ("1", "abc"), ("1", None), ("2", None), ("2", "def"), ("2", None)]
df = spark.createDataFrame(data, ["id", "value"])

print('Original data:')
df.show()

# Define window specification
windowSpec = win.partitionBy("id").orderBy(f.desc("value")).rowsBetween(win.unboundedPreceding, win.unboundedFollowing)
# Replace null values with the first non-null value in the same partition
df = df.withColumn("value", f.first("value", ignorenulls=True).over(windowSpec))

print('Updated data:')
df.show()

Original data:
+---+-----+
| id|value|
+---+-----+
|  1| null|
|  1| null|
|  1|  abc|
|  1| null|
|  2| null|
|  2|  def|
|  2| null|
+---+-----+

Updated data:
+---+-----+
| id|value|
+---+-----+
|  1|  abc|
|  1|  abc|
|  1|  abc|
|  1|  abc|
|  2|  def|
|  2|  def|
|  2|  def|
+---+-----+



## Running SQL expressions inside PySpark's cell
You can use parts of SQL code to do some custom calculations as well.

### Inside a method

In [None]:
df1.groupBy('key').agg(f.expr('percentile(value, 0.5)').alias('median')).show()

+---+------+
|key|median|
+---+------+
|  c| 394.0|
|  e| 497.0|
|  a| 837.0|
|  d| 843.0|
|  b| 826.0|
+---+------+



### Run SQL query into DataFrame

In [None]:
df = spark.sql("""
    SELECT
      *,
      if(leads_cnt>100, 'reached', 'not reached') as goal_reached
    FROM
      data.china_data
    LIMIT
      5
""")

df.show()

+----------+----------+------+---------+------------+------------+
|      date|  app_name|uv_cnt|leads_cnt|dcp_turnover|goal_reached|
+----------+----------+------+---------+------------+------------+
|2020-02-01|MB One Web| 58897|      179|        null|     reached|
|2020-02-04|MB One Web| 79708|     1054|        null|     reached|
|2020-02-03|MB One Web| 80857|      656|        null|     reached|
|2020-02-02|MB One Web| 59425|      191|        null|     reached|
|2020-02-05|MB One Web| 90161|     1289|        null|     reached|
+----------+----------+------+---------+------------+------------+



## Apply custom functions df

In [None]:
# returnType is data type returned by function. Data types are here:
# https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#data-types  
@udf(returnType=pyspark.sql.types.DateType())
def some_function(day, month, year):
  """
    It is just a dummy function with some random logic. Just take it as an example showing you can 
    apply any advanced logic to calculate values needed.
  """
  day = day if day else 2
  date = pd.Timestamp('{} {} {}'.format(year, month, day))
  if day % 2:
    date = date - pd.tseries.offsets.MonthEnd()
  return date

df = spark.createDataFrame(
  [{'day':x,'month':random.choice(['april', 'march', 'december', 'june']), 'year':np.random.randint(2019, 2022)} for x in range(10)]
)

df.select(
  '*',
  some_function(df.day, df.month, df.year).alias('custom_date')
).show()

+---+--------+----+-----------+
|day|   month|year|custom_date|
+---+--------+----+-----------+
|  0|december|2019| 2019-12-02|
|  1|   march|2021| 2021-02-28|
|  2|   april|2021| 2021-04-02|
|  3|    june|2019| 2019-05-31|
|  4|   april|2021| 2021-04-04|
|  5|   april|2020| 2020-03-31|
|  6|   april|2021| 2021-04-06|
|  7|   april|2020| 2020-03-31|
|  8|    june|2020| 2020-06-08|
|  9|december|2021| 2021-11-30|
+---+--------+----+-----------+



## Arrays and structures

### Sorting arrays with structure
https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.functions.sort_array.html

In [None]:
# Creating dataframe with structures inside array
df = (
    spark.createDataFrame(
        [(1, "Mike", 25), (2, "Zack", 30), (3, "Ashley", 30)], ["id", "name", "age"]
    )
    # Creating struct_column
    .withColumn("struct_col", f.struct(f.col("name"), f.col("id"), f.col("age")))
    # Collecting in array.
    .groupby("age").agg(f.collect_list("struct_col").alias("arrays_with_structs"))
)

df.display()

age,arrays_with_structs
30,"List(List(Zack, 2, 30), List(Ashley, 3, 30))"
25,"List(List(Mike, 1, 25))"


In [None]:
# And now sorting by name. It always sorts by the value in the first column.
df.withColumn(
  'sorted_arrays_with_structs',
  f.sort_array(f.col('arrays_with_structs'), asc=False)
).display()

age,arrays_with_structs,sorted_arrays_with_structs
30,"List(List(Zack, 2, 30), List(Ashley, 3, 30))","List(List(Zack, 2, 30), List(Ashley, 3, 30))"
25,"List(List(Mike, 1, 25))","List(List(Mike, 1, 25))"


### Getting data sample in array
Sometimes, we want summed results grouped by general values (like event_category). But we also want to see details for specific columns (like event_label). We can do this by pulling a sample of the data after using the f.collect_set() function to join it all.<br><br>
Docs:<br>
https://spark.apache.org/docs/3.2.0/api/python/reference/api/pyspark.sql.functions.collect_set.html<br>
https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.slice.html

In [None]:
df = (
  spark.table('data.ga_raw_kpis')
  .filter(
    # Limiting data to speed up test calculation
    f.col('partition_date').isin('2023-09-01')
    & f.col('ga_hits_custom_dimensions_8').isin('PT')
    & f.col('event_info_event_category').isNotNull()
  )
  .groupBy('event_info_event_category', 'event_info_event_action')
  .agg(
    # Slicing first 5 examples of labels
    f.slice(
      # Collecting unique values for labels
      f.collect_set('event_info_event_label'),
      1,
      5
    ).alias('event_label_examples'),
    f.count_distinct('session_id').alias('sessions')
  )
)

df.sort('sessions', ascending=False).display()

event_info_event_category,event_info_event_action,event_label_examples,sessions
impression,Target,"List(simpleTeaserElement_active, vmosfiltering_active, topbanner_active, pi-evolution-active, contactform_active)",2571
form,multiselector3-pi-pad-page,"List(form_error, form_engaged, form_started, form_submitted)",2511
impression,model_overview,"List(filtered, not_filtered)",2058
feature,stage,"List(rooftop, Fullscreen, line_toggle, ThreeSixtyToggle, StageView)",1723
link,owc-header|owc-header,"List(our_models.saloon.c-class, our_models.4-door-coupe.gt-class, our_models.maybach, our_models.marco-polo.marco-polo, our_models.estate.c-class)",1428
link,navigation,"List(blue_button, right_arrow, left_arrow, navi_bar.submenu, x_250_d_4matic_progressive)",1354
link,owc-simple-stage|generic,"List(para_a_mercedes_me_id, ativar_agora, 联系, configurador, show_available_vehicles)",1343
link,model_gallery,"List(voc_sortby, voc_pagination, voc_request_offer, test-drive, product-page)",1065
feature,item_tile,"List(item_change.added|SA-942|199.99, item_change.added|SA-H46|0, item_change.added|SA-899|399.99, item_change.added|recommended|SA-201|2350, item_change.added|LU-914|0)",1021
feature,feasibility_conflict,"List(feasibility_load.added|amg|SA-RWE|0|31149.99|2.26|1.0|0.0, item_change.added|SA-735|0|19799.98|5.22|0.0|0.0, feasibility_load.added|SA-811|6549.99|15799.98|2.22|2.0|0.0, feasibility_abort.added|PC-P23|0|10900|1.32|1.0|0.0, feasibility_abort.added|SA-553|449.99|1649.99|2.0|0.0|0.0)",889


#Some simple analysis examples

## Extract AB test branch (string split)
Values in cd62 we're going to work with:
* AB-Test|VMOS|VMOS_Gallery_View|Target 
* AB-Test|VMOS|VMOS_Gallery_View|Control

In [None]:
# Importing data
ga_raw_database = spark.conf.get('spark.database.name')
raw_data = spark.table( ga_raw_database + '.ga_raw_kpis' )\
.filter(
  f.col('partition_date').between(datetime.date(2022,1,19), datetime.date(2022,2,1))
  & f.col('ga_hits_custom_dimensions_8').isin('DE', 'BE', 'NL', 'LU', 'ZA', 'TR')
)

# Now let's find sessions, related to the test and define branches for users.
ab_sessions = raw_data\
.filter(
  f.col('ga_hits_custom_dimensions_62').rlike('AB-Test\|VMOS\|VMOS_Gallery_View\|(Control|Target)')
)\
.select(
  'session_id',
  'full_visitor_id',
  # More info about extract function could be found here — https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.regexp_extract.html   
  f.regexp_extract(f.col('ga_hits_custom_dimensions_62'), r'.*(AB-Test\|VMOS\|VMOS_Gallery_View\|)(:?Control|Target).*', 1).alias('ab_name'),
  f.regexp_extract(f.col('ga_hits_custom_dimensions_62'), r'.*AB-Test\|VMOS\|VMOS_Gallery_View\|(Control|Target).*', 1).alias('ab_branch'),
  # Here is another approach with split function if we just need to get value after separating string by delimiter
  # https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.functions.split.html
  f.split(f.col('ga_hits_custom_dimensions_62'), r'\|', -1)[3].alias('ab_branch_split'),
  'ga_hits_custom_dimensions_62'
).distinct()

display(ab_sessions.limit(10))

session_id,full_visitor_id,ab_name,ab_branch,ab_branch_split,ga_hits_custom_dimensions_62
2625889864787602575|1643735257|2022-02-01,2625889864787602575,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
74376156466973270|1643711062|2022-02-01,74376156466973270,AB-Test|VMOS|VMOS_Gallery_View|,Control,Control,AB-Test|VMOS|VMOS_Gallery_View|Control
6651366804985305507|1643731363|2022-02-01,6651366804985305507,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
1845858795209178000|1643723664|2022-02-01,1845858795209178000,AB-Test|VMOS|VMOS_Gallery_View|,Control,Control,AB-Test|VMOS|VMOS_Gallery_View|Control
587922638678449957|1643714608|2022-02-01,587922638678449957,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
8969900102528888463|1643736718|2022-02-01,8969900102528888463,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
3083825905019997116|1643701029|2022-02-01,3083825905019997116,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
1551714949164699887|1643747663|2022-02-01,1551714949164699887,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
2762949194686614064|1643705691|2022-02-01,2762949194686614064,AB-Test|VMOS|VMOS_Gallery_View|,Target,Target,AB-Test|VMOS|VMOS_Gallery_View|Target
873716634504211793|1643720015|2022-02-01,873716634504211793,AB-Test|VMOS|VMOS_Gallery_View|,Control,Control,AB-Test|VMOS|VMOS_Gallery_View|Control


## Calculate click rates

In [None]:
# Calculating CTR for CTA's in the promo block at the main page.
data = spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
.filter(
  f.col('partition_date').between(datetime.date(2022,9,1), datetime.date(2022,9,3))
  & f.col('page_page_path').isin('/', '/passengercars.html')
  & (
    f.col('type').isin('PAGE')
    | (
      f.col('event_info_event_category').isin('link')
      & f.col('event_info_event_action').isin('owc-simple-stage|owc-simple-stage')
    )
  )
)\
.select(
  f.when(
    f.col('type').isin('PAGE'), 
    f.col('full_visitor_id')
  ).alias('page_viewers'),
  f.when(
    f.col('event_info_event_category').isNotNull(),
    f.col('full_visitor_id')
  ).alias('clicked_on_cta')
)

# Now let's calculate CTR's by page views and by unique users
data.agg(
  f.count('page_viewers').alias('page_viewed'),
  f.count('clicked_on_cta').alias('clicks'),
  f.round(f.count('clicked_on_cta') / f.count('page_viewers') * 100, 1).alias('CTR_per_view_%'),
  f.count_distinct('page_viewers').alias('users_page_viewers'),
  f.count_distinct('clicked_on_cta').alias('users_clickers'),
  f.round(f.count_distinct('clicked_on_cta') / f.count_distinct('page_viewers') * 100, 1).alias('CTR_per_user_%')
).show()

print('Just in case, to show that null values are not counted, here is the number of rows: {}. So it doesn\'t match the number of page views.'.format(data.count()))

+-----------+------+--------------+------------------+--------------+--------------+
|page_viewed|clicks|CTR_per_view_%|users_page_viewers|users_clickers|CTR_per_user_%|
+-----------+------+--------------+------------------+--------------+--------------+
|     259137|  2578|           1.0|            168108|          2380|           1.4|
+-----------+------+--------------+------------------+--------------+--------------+

Just in case, to show that null values are not counted, here is the number of rows: 261715. So it doesn't match the number of page views.


## Common previous / next pages
This section shows off how  to calculate which pages are the most common previous or next one's for a certain page at the website. For example, let's calculate this for '/passengercars/models.html'.

In [None]:
data = spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
.filter(
  f.col('partition_date').between(datetime.date(2022,9,1), datetime.date(2022,9,3))
  & f.col('type').isin('PAGE')
)\
.select(
  'session_id',
  'hit_number',
  'page_page_path',
  # To limit calculations, we calculate previous and next pageviews only for those rows, which
  # relates to the page we are doing research for.  
  f.when(
    f.col('page_page_path').isin('/passengercars/models.html'),
    f.lag('page_page_path').over(win.partitionBy('session_id').orderBy('hit_number'))
  ).alias('previous_page'),
  f.when(
    f.col('page_page_path').isin('/passengercars/models.html'),
    f.lead('page_page_path').over(win.partitionBy('session_id').orderBy('hit_number'))
  ).alias('next_page')
)\
.filter( f.col('page_page_path').isin('/passengercars/models.html') )

# Now let's calculate which pages are the most popular ones
hits_with_previous_page_num = data.filter(f.col('previous_page').isNotNull()).count()
hits_with_next_page_num = data.filter(f.col('next_page').isNotNull()).count()

previous_pages = data\
.filter(f.col('previous_page').isNotNull())\
.groupBy('previous_page')\
.agg(
  f.count('hit_number').alias('hits_as_previous')
)\
.withColumn('share_as_previous_%', f.round(f.col('hits_as_previous')/hits_with_previous_page_num*100, 1))\
.withColumnRenamed('previous_page', 'page')

next_pages = data\
.filter(f.col('next_page').isNotNull())\
.groupBy('next_page')\
.agg(
  f.count('hit_number').alias('hits_as_next')
)\
.withColumn('share_as_next_%', f.round(f.col('hits_as_next')/hits_with_next_page_num*100, 1))\
.withColumnRenamed('next_page', 'page')

# Joining the data to show altogether
display( previous_pages.join(next_pages, 'page', 'full').sort('share_as_previous_%', ascending=False) )

page,hits_as_previous,share_as_previous_%,hits_as_next,share_as_next_%
/,17853.0,17.8,1269.0,1.3
/passengercars.html,15765.0,15.7,1450.0,1.5
/passengercars/models.html,9074.0,9.0,9074.0,9.5
/passengercars/mercedes-benz-cars/car-configurator.html/configuration/summary,2225.0,2.2,68.0,0.1
/passengercars/mercedes-benz-cars/car-configurator.html/motorization/OFFROADER,1700.0,1.7,3570.0,3.7
/passengercars/content-pool/marketing-pool/contact-forms/alwayson-new/search/c-class-saloon.html,1041.0,1.0,28.0,0.0
/passengercars/models/hatchback/a-class/overview.html,1018.0,1.0,4459.0,4.7
/passengercars/mercedes-benz-cars/models/c-class/saloon-w206/explore.html,912.0,0.9,3142.0,3.3
/passengercars/mercedes-benz-cars/models/eqa/explore.html,741.0,0.7,2364.0,2.5
/passengercars/buy/new-car/search-results.html/,636.0,0.6,418.0,0.4


## Internal/External traffic for a page
This is a similar calculation to what was shown [here](https://adb-7939824218364777.17.azuredatabricks.net/?o=7939824218364777#notebook/3855627078257489/command/4060234124965464) as a basic example.

In [None]:
spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
.filter( f.col('partition_date').between( datetime.date(2022, 9, 1), datetime.date(2022, 9, 12) ) )\
.select(
  f.when(
    f.col('hit_number') == 1,
    'external_traffic'
  ).otherwise('internal_traffic').alias('traffic_type'),
  'session_id'
)\
.groupby('traffic_type')\
.agg(f.count_distinct('session_id').alias('sessions')).show()

+----------------+--------+
|    traffic_type|sessions|
+----------------+--------+
|internal_traffic| 3684281|
|external_traffic| 5084991|
+----------------+--------+



## Attributing leads to applications
Here we attribute leads to the application which doesn't generate lead event. For instance, let's calculate what is the share of vmos engagers within all leads and within the same session.

In [None]:
spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
.filter( 
  f.col('partition_date').between( datetime.date(2022, 9, 1), datetime.date(2022, 9, 12) ) 
  & (
    (f.col('ga_vehicle_leads') == 1)
    | f.col('engagers_model_overview').isNotNull()
  )
)\
.select(
  f.when(
    f.col('ga_vehicle_leads') == 1,
    f.col('full_visitor_id')
  ).alias('lead_full_visitor_id'),
  f.when(
    f.col('ga_vehicle_leads') == 1,
    f.lag('engagers_model_overview').over(win.partitionBy('session_id').orderBy('hit_number'))
  ).alias('vmos_engager')
)\
.agg(
  f.count_distinct('lead_full_visitor_id').alias('all_leads'),
  f.count_distinct('vmos_engager').alias('leads_after_vmos')
)\
.withColumn('vmos_engagers_share_%', f.round(f.col('leads_after_vmos') / f.col('all_leads') * 100, 0))\
.show()

+---------+----------------+---------------------+
|all_leads|leads_after_vmos|vmos_engagers_share_%|
+---------+----------------+---------------------+
|    12265|            4412|                 36.0|
+---------+----------------+---------------------+



## Persist a value until the next value in the same column
For instance, we can keep time value for the last pageview until the next pageview happens.

In [None]:
spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
.filter( 
  f.col('partition_date').between( datetime.date(2022, 9, 1), datetime.date(2022, 9, 2) )
)\
.select(
 'session_id',
 f.when(
   f.col('type').isin('PAGE'),
   1
 ).alias('pw'),
  f.when(
    f.col('type').isin('EVENT'),
    1
  ).alias('ev')
)\
.groupby('session_id').agg(f.count('pw').alias('pw'), f.count('ev').alias('ev'))\
.sort('pw', ascending=False).show(truncate=False)

+-----------------------------------------+---+---+
|session_id                               |pw |ev |
+-----------------------------------------+---+---+
|7736295571405692956|1662017564|2022-09-01|500|0  |
|4716559668234745941|1662056534|2022-09-01|500|0  |
|502765957049094244|1662128227|2022-09-02 |500|0  |
|7205074946581729139|1662127987|2022-09-02|500|0  |
|6673723076147024483|1662128739|2022-09-02|500|0  |
|4468504635213182706|1662088945|2022-09-02|489|11 |
|8312237289790360073|1662153006|2022-09-02|485|15 |
|4501691817442743967|1662137459|2022-09-02|404|0  |
|6690161981868028046|1662070926|2022-09-02|404|39 |
|7948251009167416722|1662016913|2022-09-01|380|0  |
|3567059525986001707|1662100267|2022-09-02|321|17 |
|4019297257014211999|1662042277|2022-09-01|280|0  |
|7047073549417303132|1662110812|2022-09-02|276|0  |
|2702214173845193708|1662092269|2022-09-02|256|3  |
|9116751863395392356|1661999971|2022-09-01|246|0  |
|2674195069926298903|1662003169|2022-09-01|240|0  |
|39672032722

In [None]:
display(
  spark.table( '{}.ga_raw_kpis'.format( spark.conf.get('spark.database.name') ) )\
  .filter( 
    f.col('partition_date').between( datetime.date(2022, 9, 1), datetime.date(2022, 9, 2) )
    # Showing how it works using this session, where was a lot of actions     
    & f.col('session_id').isin('3096324607763965837|1662150586|2022-09-02')
  )\
  .select(
    'session_id',
    'page_page_path',
    'type',
    'hit_number',
    'time',
    f.when(
      f.col('type') == 'PAGE',
      f.col('time')
    ).alias('pageview_time')
  )\
  .select(
    'page_page_path',
    'type',
    'hit_number',
    'time',
    f.last('pageview_time', ignorenulls=True).over(win.partitionBy('session_id', 'page_page_path').orderBy('hit_number')).alias('pageview_time')
  ).sort('full_visitor_id', 'session_id', 'hit_number', ascending=True).limit(100)
)

page_page_path,type,hit_number,time,pageview_time
/passengercars/content-pool/tool-pages/car-configurator.html/bm/4632761,PAGE,1,0,0
/passengercars/content-pool/tool-pages/car-configurator.html/bm/4632761,EVENT,2,1438,0
/passengercars/content-pool/tool-pages/car-configurator.html/bm/4632761,EVENT,3,11866,0
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-501_GC-G44_LE-L_LU-775_MJ-X21_PC-EM4-HA1-P54-P76-PA8-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-968-A20-A53-BB3-BS2-C92-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FR2-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-Q30-RB9-RG1-RY6-SH1-T84-U88-V43-V57-VL9-W72-Y16-ZA2-ZG2_SC-15U-2T0-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IE8-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W11-X14-XN2-Y50-Y65-Y78/zk015,PAGE,4,12222,12222
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-501_GC-G44_LE-L_LU-775_MJ-X21_PC-EM4-HA1-P54-P76-PA8-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-968-A20-A53-BB3-BS2-C92-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FR2-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-Q30-RB9-RG1-RY6-SH1-T84-U88-V43-V57-VL9-W72-Y16-ZA2-ZG2_SC-15U-2T0-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IE8-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W11-X14-XN2-Y50-Y65-Y78/zk015,EVENT,5,18962,12222
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-501_GC-G44_LE-L_LU-775_MJ-X21_PC-EM4-HA1-P54-P76-PA8-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-968-A20-A53-BB3-BS2-C92-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FR2-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-Q30-RB9-RG1-RY6-SH1-T84-U88-V43-V57-VL9-W72-Y16-ZA2-ZG2_SC-15U-2T0-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IE8-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W11-X14-XN2-Y50-Y65-Y78/zk015,EVENT,6,21024,12222
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-537_GC-G44_LE-L_LU-197_MJ-X21_PC-EM4-HA1-P54-P56-P76-PA8-PAZ-PEO-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-7U8-968-A20-A53-BB3-BS2-C52-C55-C92-CK6-CS1-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FO1-FR2-FS0-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-L6G-Q30-RB4-RJ0-RY6-SH1-T84-U66-U88-V57-VA7-W72-Y16-ZA2_SC-15U-2T0-5U7-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IM7-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W12-X14-XN2-Y50-Y65-Y78/exterior_style,PAGE,7,22035,22035
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-537_GC-G44_LE-L_LU-197_MJ-X21_PC-EM4-HA1-P54-P56-P76-PA8-PAZ-PEO-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-7U8-968-A20-A53-BB3-BS2-C52-C55-C92-CK6-CS1-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FO1-FR2-FS0-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-L6G-Q30-RB4-RJ0-RY6-SH1-T84-U66-U88-V57-VA7-W72-Y16-ZA2_SC-15U-2T0-5U7-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IM7-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W12-X14-XN2-Y50-Y65-Y78/exterior_style,EVENT,8,31813,22035
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-537_GC-G44_LE-L_LU-197_MJ-X21_PC-EM4-HA1-P54-P56-P76-PA8-PAZ-PEO-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-7U8-968-A20-A53-BB3-BS2-C52-C55-C92-CK6-CS1-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FO1-FR2-FS0-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-L6G-Q30-RB4-RJ0-RY6-SH1-T84-U66-U88-V57-VA7-W72-Y16-ZA2_SC-15U-2T0-5U7-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IM7-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W12-X14-XN2-Y50-Y65-Y78/paints,PAGE,9,31917,31917
/passengercars/content-pool/tool-pages/car-configurator.html/tr_TR__4632761_TR1__AJ-052_AU-537_GC-G44_LE-L_LU-197_MJ-X21_PC-EM4-HA1-P54-P56-P76-PA8-PAZ-PEO-PT1_PS-007#-011#-01U#-042#-069#-071#-075#-080#-081#-089#-103#-139#-169#-281#-287#-292#-294#-310#-345#-440#-581#-587#-589#-601#-602#-603#-772#-872#-873#-889#-A73#-B03#-C62#-C82#-C85#-E15#-EV1#-F11#-RR8#-S54#_SA-15B-7U8-968-A20-A53-BB3-BS2-C52-C55-C92-CK6-CS1-EA2-EH3-EM3-EU4-EU8-EZ8-EZ9-F15-F75-F90-FB1-FH5-FM2-FO1-FR2-FS0-FZ8-H04-J25-JA6-JA9-JC0-JP0-JS1-JW8-K81-L57-L6G-Q30-RB4-RJ0-RY6-SH1-T84-U66-U88-V57-VA7-W72-Y16-ZA2_SC-15U-2T0-5U7-7B4-8B8-8U8-961-989-998-B10-C59-EL0-EV1-EV3-EV4-FD7-IM7-K15-K31-K36-KA3-M92-MC1-MJ7-P40-S82-TB0-U01-U78-W12-X14-XN2-Y50-Y65-Y78/paints,EVENT,10,40214,31917
