In [1]:
from pyspark.sql.types import Row, IntegerType
from datetime import datetime
from pyspark.sql import functions as F

In [2]:
records = sc.parallelize([[1, "Gaurav", 22], [2, "Deb", 23]])
records

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [3]:
records.collect()

[[1, 'Gaurav', 22], [2, 'Deb', 23]]

In [4]:
column_names = Row('id', 'name', 'age')
records = records.map(lambda r: column_names(*r))

In [5]:
records.collect()

[Row(id=1, name='Gaurav', age=22), Row(id=2, name='Deb', age=23)]

In [6]:
records.count()

2

In [7]:
records.take(2)

[Row(id=1, name='Gaurav', age=22), Row(id=2, name='Deb', age=23)]

In [8]:
df = records.toDF()
df

DataFrame[id: bigint, name: string, age: bigint]

In [9]:
df.show()

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|Gaurav| 22|
|  2|   Deb| 23|
+---+------+---+



In [10]:
named_records = sc.parallelize([Row(id=1, name="Gaurav", age=22), Row(id=2, name="Deb", age=23), Row(id=3, name="Sartaj", age=33)])

In [11]:
named_records.collect()

[Row(age=22, id=1, name='Gaurav'),
 Row(age=23, id=2, name='Deb'),
 Row(age=33, id=3, name='Sartaj')]

In [12]:
named_df = named_records.toDF()
named_df

DataFrame[age: bigint, id: bigint, name: string]

In [13]:
named_df.show()

+---+---+------+
|age| id|  name|
+---+---+------+
| 22|  1|Gaurav|
| 23|  2|   Deb|
| 33|  3|Sartaj|
+---+---+------+



In [14]:
named_records_with_score = sc.parallelize([Row(id=1, name="Gaurav", age=22, score=[34, 33]), Row(id=2, name="Deb", age=23, score=[34, 23]), Row(id=3, name="Sartaj", age=33, score=[14, 22])])

In [15]:
complex_df = named_records_with_score.toDF()
complex_df.show()

+---+---+------+--------+
|age| id|  name|   score|
+---+---+------+--------+
| 22|  1|Gaurav|[34, 33]|
| 23|  2|   Deb|[34, 23]|
| 33|  3|Sartaj|[14, 22]|
+---+---+------+--------+



In [16]:
sqlCtx = SQLContext(sc)

In [17]:
df = sqlCtx.range(5)
df

DataFrame[id: bigint]

In [18]:
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [19]:
sql_ctx_data = [("Gaurav", 22), ("Deb", 23)]

In [20]:
sqlCtx.createDataFrame(sql_ctx_data).show()

+------+---+
|    _1| _2|
+------+---+
|Gaurav| 22|
|   Deb| 23|
+------+---+



In [21]:
sqlCtx.createDataFrame(sql_ctx_data, ["Name", "Age"]).show()

+------+---+
|  Name|Age|
+------+---+
|Gaurav| 22|
|   Deb| 23|
+------+---+



In [22]:
complex_data = [['udilctmebs',
  [988, 185, 484, 86, 734, 443, 90, 716, 988, 506],
  {'a': 54, 'n': 43, 'w': 55, 's': 2, 'h': 47},
  datetime(2010, 11, 1, 0, 0)],
 ['thwdwfeswz',
  [936, 728, 506, 916, 816, 691, 491, 55, 528, 610],
  {'z': 6, 'h': 69, 'q': 55, 'm': 17, 'r': 13},
  datetime(1983, 7, 13, 0, 0)],
 ['miusfkowwk',
  [371, 871, 203, 49, 657, 200, 530, 372, 302, 403],
  {'s': 4, 'o': 12, 'g': 2, 'c': 49, 'q': 4},
  datetime(1993, 4, 1, 0, 0)],
 ['zqgtupjqdh',
  [465, 742, 847, 416, 368, 534, 81, 974, 14, 344],
  {'r': 95, 'e': 13, 'c': 14, 'w': 20, 'o': 70},
  datetime(2001, 7, 12, 0, 0)],
 ['jywktnyhsm',
  [187, 676, 765, 517, 74, 25, 974, 207, 78, 25],
  {'l': 88, 'y': 40, 'k': 85, 'n': 82, 't': 52},
  datetime(2010, 7, 15, 0, 0)],
 ['kbvjwwthli',
  [447, 937, 213, 755, 80, 177, 734, 607, 455, 318],
  {'m': 14, 't': 86, 'b': 25, 'j': 83, 'f': 56},
  datetime(2011, 7, 26, 0, 0)],
 ['rgkkrkuoax',
  [98, 868, 975, 32, 362, 192, 860, 791, 994, 223],
  {'n': 55, 'q': 30, 'a': 60, 'r': 7, 'y': 61},
  datetime(1993, 2, 15, 0, 0)],
 ['supkvavoop',
  [941, 875, 654, 545, 88, 127, 818, 170, 865, 304],
  {'x': 62, 'n': 100, 'h': 43, 'a': 87, 'l': 91},
  datetime(2018, 5, 13, 0, 0)],
 ['vbpslzfoua',
  [862, 694, 264, 171, 998, 991, 488, 95, 269, 807],
  {'n': 49, 'm': 33, 'q': 34, 'i': 77, 'v': 38},
  datetime(2023, 9, 10, 0, 0)],
 ['vahirmwvly',
  [709, 628, 688, 138, 414, 356, 257, 615, 940, 560],
  {'f': 18, 'v': 35, 'q': 66, 's': 48, 'k': 43},
  datetime(1984, 8, 10, 0, 0)]]

In [23]:
complex_sc_values = sc.parallelize(complex_data)
column_names = Row('name', 'scores', 'keys', 'date')
complex_sc_values = complex_sc_values.map(lambda r: column_names(*r))
complex_sc_values.collect()

[Row(name='udilctmebs', scores=[988, 185, 484, 86, 734, 443, 90, 716, 988, 506], keys={'a': 54, 'n': 43, 'w': 55, 's': 2, 'h': 47}, date=datetime.datetime(2010, 11, 1, 0, 0)),
 Row(name='thwdwfeswz', scores=[936, 728, 506, 916, 816, 691, 491, 55, 528, 610], keys={'z': 6, 'h': 69, 'q': 55, 'm': 17, 'r': 13}, date=datetime.datetime(1983, 7, 13, 0, 0)),
 Row(name='miusfkowwk', scores=[371, 871, 203, 49, 657, 200, 530, 372, 302, 403], keys={'s': 4, 'o': 12, 'g': 2, 'c': 49, 'q': 4}, date=datetime.datetime(1993, 4, 1, 0, 0)),
 Row(name='zqgtupjqdh', scores=[465, 742, 847, 416, 368, 534, 81, 974, 14, 344], keys={'r': 95, 'e': 13, 'c': 14, 'w': 20, 'o': 70}, date=datetime.datetime(2001, 7, 12, 0, 0)),
 Row(name='jywktnyhsm', scores=[187, 676, 765, 517, 74, 25, 974, 207, 78, 25], keys={'l': 88, 'y': 40, 'k': 85, 'n': 82, 't': 52}, date=datetime.datetime(2010, 7, 15, 0, 0)),
 Row(name='kbvjwwthli', scores=[447, 937, 213, 755, 80, 177, 734, 607, 455, 318], keys={'m': 14, 't': 86, 'b': 25, 'j': 8

In [24]:
complex_df_values = sqlCtx.createDataFrame(complex_data, ['name', 'scores', 'keys', 'date'])
complex_df_values.collect()

[Row(name='udilctmebs', scores=[988, 185, 484, 86, 734, 443, 90, 716, 988, 506], keys={'h': 47, 'a': 54, 's': 2, 'n': 43, 'w': 55}, date=datetime.datetime(2010, 11, 1, 0, 0)),
 Row(name='thwdwfeswz', scores=[936, 728, 506, 916, 816, 691, 491, 55, 528, 610], keys={'h': 69, 'q': 55, 'r': 13, 'z': 6, 'm': 17}, date=datetime.datetime(1983, 7, 13, 0, 0)),
 Row(name='miusfkowwk', scores=[371, 871, 203, 49, 657, 200, 530, 372, 302, 403], keys={'q': 4, 'c': 49, 's': 4, 'g': 2, 'o': 12}, date=datetime.datetime(1993, 4, 1, 0, 0)),
 Row(name='zqgtupjqdh', scores=[465, 742, 847, 416, 368, 534, 81, 974, 14, 344], keys={'r': 95, 'c': 14, 'e': 13, 'w': 20, 'o': 70}, date=datetime.datetime(2001, 7, 12, 0, 0)),
 Row(name='jywktnyhsm', scores=[187, 676, 765, 517, 74, 25, 974, 207, 78, 25], keys={'y': 40, 'k': 85, 't': 52, 'l': 88, 'n': 82}, date=datetime.datetime(2010, 7, 15, 0, 0)),
 Row(name='kbvjwwthli', scores=[447, 937, 213, 755, 80, 177, 734, 607, 455, 318], keys={'b': 25, 'j': 83, 't': 86, 'm': 1

In [25]:
complex_df_values.show()

+----------+--------------------+--------------------+-------------------+
|      name|              scores|                keys|               date|
+----------+--------------------+--------------------+-------------------+
|udilctmebs|[988, 185, 484, 8...|[h -> 47, a -> 54...|2010-11-01 00:00:00|
|thwdwfeswz|[936, 728, 506, 9...|[h -> 69, q -> 55...|1983-07-13 00:00:00|
|miusfkowwk|[371, 871, 203, 4...|[q -> 4, c -> 49,...|1993-04-01 00:00:00|
|zqgtupjqdh|[465, 742, 847, 4...|[r -> 95, c -> 14...|2001-07-12 00:00:00|
|jywktnyhsm|[187, 676, 765, 5...|[y -> 40, k -> 85...|2010-07-15 00:00:00|
|kbvjwwthli|[447, 937, 213, 7...|[b -> 25, j -> 83...|2011-07-26 00:00:00|
|rgkkrkuoax|[98, 868, 975, 32...|[a -> 60, q -> 30...|1993-02-15 00:00:00|
|supkvavoop|[941, 875, 654, 5...|[h -> 43, x -> 62...|2018-05-13 00:00:00|
|vbpslzfoua|[862, 694, 264, 1...|[q -> 34, i -> 77...|2023-09-10 00:00:00|
|vahirmwvly|[709, 628, 688, 1...|[q -> 66, s -> 48...|1984-08-10 00:00:00|
+----------+-------------

Selectively get values from DF's RDD:

In [26]:
complex_df_values.rdd.map(lambda x: (x.name, x.date)).collect()

[('udilctmebs', datetime.datetime(2010, 11, 1, 0, 0)),
 ('thwdwfeswz', datetime.datetime(1983, 7, 13, 0, 0)),
 ('miusfkowwk', datetime.datetime(1993, 4, 1, 0, 0)),
 ('zqgtupjqdh', datetime.datetime(2001, 7, 12, 0, 0)),
 ('jywktnyhsm', datetime.datetime(2010, 7, 15, 0, 0)),
 ('kbvjwwthli', datetime.datetime(2011, 7, 26, 0, 0)),
 ('rgkkrkuoax', datetime.datetime(1993, 2, 15, 0, 0)),
 ('supkvavoop', datetime.datetime(2018, 5, 13, 0, 0)),
 ('vbpslzfoua', datetime.datetime(2023, 9, 10, 0, 0)),
 ('vahirmwvly', datetime.datetime(1984, 8, 10, 0, 0))]

There's a builtin function for this on dataframe:

In [27]:
complex_df_values.select('name', 'scores').show()

+----------+--------------------+
|      name|              scores|
+----------+--------------------+
|udilctmebs|[988, 185, 484, 8...|
|thwdwfeswz|[936, 728, 506, 9...|
|miusfkowwk|[371, 871, 203, 4...|
|zqgtupjqdh|[465, 742, 847, 4...|
|jywktnyhsm|[187, 676, 765, 5...|
|kbvjwwthli|[447, 937, 213, 7...|
|rgkkrkuoax|[98, 868, 975, 32...|
|supkvavoop|[941, 875, 654, 5...|
|vbpslzfoua|[862, 694, 264, 1...|
|vahirmwvly|[709, 628, 688, 1...|
+----------+--------------------+



In [28]:
complex_df_values.rdd\
                    .map(lambda x: (x.name + "-hehe"))\
                    .collect()

['udilctmebs-hehe',
 'thwdwfeswz-hehe',
 'miusfkowwk-hehe',
 'zqgtupjqdh-hehe',
 'jywktnyhsm-hehe',
 'kbvjwwthli-hehe',
 'rgkkrkuoax-hehe',
 'supkvavoop-hehe',
 'vbpslzfoua-hehe',
 'vahirmwvly-hehe']

**Adding a new column to the DF:**

In [29]:
sum_cols = F.udf(lambda arr: 0 if arr == [] else __builtins__.sum(arr),IntegerType())
complex_df_values.select('name', 'scores').withColumn('total score', sum_cols(F.col("scores"))).show()

+----------+--------------------+-----------+
|      name|              scores|total score|
+----------+--------------------+-----------+
|udilctmebs|[988, 185, 484, 8...|       5220|
|thwdwfeswz|[936, 728, 506, 9...|       6277|
|miusfkowwk|[371, 871, 203, 4...|       3958|
|zqgtupjqdh|[465, 742, 847, 4...|       4785|
|jywktnyhsm|[187, 676, 765, 5...|       3528|
|kbvjwwthli|[447, 937, 213, 7...|       4723|
|rgkkrkuoax|[98, 868, 975, 32...|       5395|
|supkvavoop|[941, 875, 654, 5...|       5387|
|vbpslzfoua|[862, 694, 264, 1...|       5639|
|vahirmwvly|[709, 628, 688, 1...|       5305|
+----------+--------------------+-----------+

