__Apache Spark__ – это Big Data фреймворк (фреймворк для кластерных вычислений и крупномасштабной обработки данных) с открытым исходным кодом для распределённой пакетной и потоковой обработки неструктурированных и слабоструктурированных данных, входящий в экосистему проектов Hadoop.

![](https://www.bigdataschool.ru/wp-content/uploads/2019/10/spark0.png)

Классический MapReduce, Apache компонент Hadoop для обработки данных, проводит вычисления в два этапа:

1. _Map_, когда главный узел кластера (master) распределяет задачи по рабочим узлам (node)
2. _Reduce_, когда данные сворачиваются и передаются обратно на главный узел, формируя окончательный результат вычислений.

![MapReduce](https://www.bigdataschool.ru/wp-content/uploads/2019/10/MapReduce0.png)

Пока все процессы этапа Map не закончатся, процессы Reduce не начнутся. При этом все операции проходят по циклу чтение-запись с жесткого диска. Это обусловливает задержки в обработке информации. Таким образом, технология MapReduce хорошо подходит для задач распределенных вычислений в пакетном режиме, но из-за задержек (latency) не может использоваться для потоковой обработки в режиме реального времени.

__Для решения этой проблемы был создан Apache Spark__ и другие Big Data фреймворки распределенной потоковой обработки (Storm, Samza, Flink).

В отличие от классического обработчика ядра Apache Hadoop c двухуровневой концепцией MapReduce на базе дискового хранилища, Spark использует специализированные примитивы для ___рекуррентной обработки в оперативной памяти___. Благодаря этому многие вычислительные задачи реализуются в Spark ___значительно быстрее___. Например, возможность многократного доступа к загруженным в память пользовательским данным позволяет эффективно работать с алгоритмами машинного обучения (Machine Learning).

![](https://www.bigdataschool.ru/wp-content/uploads/2019/10/spark1.png)

Благодаря наличию разнопрофильных инструментов для аналитической обработки данных «на лету» (SQL, Streaming, MLLib, GraphX), Apache Spark активно используется в решении задач обработки Больших данных, в системах интернета вещей (Internet of Things, IoT) на стороне IoT-платформ, а также в различных бизнес-приложениях, в т.ч. на базе методов Machine Learning (аналитика Больших данных).

## RDD - Устойчивые распределённые наборы данных

RDD – строительные блоки Spark: всё состоит из них. Даже высокоуровневые Spark API (DataFrames, Datasets) состоят из RDD под капотом. Что значит быть устойчивым распределённым набором данных?

* __R__esilient – Устойчивый: поскольку Spark работает на кластере компьютеров, потеря данных из-за аппаратного сбоя представляет собой серьёзную проблему, поэтому RDD отказоустойчивые и восстанавливаются в случае сбоя.
* __D__istributed – Распределённый: один RDD хранится на нескольких узлах кластера, которые не принадлежат одному источнику (и одной точке отказа). Таким образом, кластер оперирует RDD параллельно.
* __D__ataset – Набор данных: коллекция значений – вы наверняка уже знали это.

Данные, с которыми мы работаем в Spark, хранятся в той или иной форме в RDD.

## Spark и ленивые вычисления

Spark определяет набор API для работы с RDD, которые разбиты на две большие группы: Трансформации и Действия.

__Трансформации__ создают новый RDD из существующего. К ним относятся:
* select
* withColumn
* filter
* oderBy
* sort
* different
* dropDuplicates
* join
* goupBy

__Действия__ возвращают значение или значения программе-драйверу после выполнения вычисления над RDD. К ним относятся:
* collect
* count
* take
* top
* foreach

<span style="color: red;">__Трансформации в Spark «ленивые»__</span>. Это означает, что, когда сообщаем Spark о создании RDD с помощью трансформаций существующего RDD, он ***не будет генерировать этот набор данных, пока не выполнится действие*** над ним или его дочерним элементом. Затем Spark выполнит трансформацию и действие, которое её запустило. Поэтому Spark работает намного эффективнее.

## PySpark

Apache Spark реализован на языке программирования Scala, который выполняется на JVM (Java Virtual Machine). __Чтобы получить функциональность Spark в Python, используется PySpark__. Поэтому те, кто не знаком со Scala, но знаком с Python, могут запросто использовать возможности фрейвморка Apache Spark.

PySpark взаимодействует с самим Spark через специальную библиотеку Py4J. Она позволяет программам Python, которые выполняются интерпретатором, динамически обращаться к объектам Java в JVM, транслируя код Scala в JVM. Для большей совместимости PySpark поддерживает парадигму функционального программирования, поскольку:
* Язык Scala — функциональный.
* Функциональный код намного проще распараллелить.

Таким образом, PySpark позволяет проводить параллельную обработку без необходимости использования каких-либо модулей Python для потоковой или многопроцессорной обработки. Вся сложная коммуникация и синхронизация между потоками, процессами и даже разными CPU обрабатываются в Spark.

# Подготовка

## Установка

In [None]:
# в colab можно ставить на систему, в jhub.jinr.ru - не разрешит
!apt-get install -q openjdk-8-jdk-headless

In [None]:
# качаем спарк
!wget -c https://apache-mirror.rbc.ru/pub/apache/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz

--2023-05-24 10:54:48--  https://apache-mirror.rbc.ru/pub/apache/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
Resolving apache-mirror.rbc.ru (apache-mirror.rbc.ru)... 80.68.250.216
Connecting to apache-mirror.rbc.ru (apache-mirror.rbc.ru)|80.68.250.216|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388407094 (370M) [application/octet-stream]
Saving to: ‘spark-3.4.0-bin-hadoop3.tgz’


2023-05-24 10:55:20 (12.3 MB/s) - ‘spark-3.4.0-bin-hadoop3.tgz’ saved [388407094/388407094]



In [None]:
# если ещё не извлекали - разархивируем
!if [ ! -d spark-3.4.0-bin-hadoop3 ]; then tar -xzf spark-3.4.0-bin-hadoop3.tgz; fi

In [None]:
!pip install findspark # в jhub: pip install --user findspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


## Запуск

In [None]:
import os

# чтобы спарк мог запускаться
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# чтобы скрипт нашёл библиотеки спарка
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

# в jhub:
# os.environ["JAVA_HOME"] = "/zfs/store5.hydra.local/user/i/ikadochn/java_v8u181"
# os.environ["SPARK_HOME"] = f"{os.getcwd()}/spark-3.2.1-bin-hadoop3.2"

In [None]:
import findspark
findspark.init()

In [None]:
# сессия нового (SQL-подобного) API к Spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[4]").getOrCreate()
spark

In [None]:
import pyspark
# а можно так
conf = pyspark.conf.SparkConf()
conf.setMaster("local[4]")
sc = pyspark.SparkContext.getOrCreate(conf)
sc

# Spark DataFrame

## Создание из локальных данных

In [None]:
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1'),
    Row(a=2, b=3., c='string2'),
    Row(a=4, b=5., c='string3')
])

df.show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  4|5.0|string3|
+---+---+-------+



![](https://web.archive.org/web/20210719202944if_/https://miro.medium.com/max/1400/0*n3xLxUkG9G2gczCq.png)

Расширенный вывод при описании плана запроса (`df.explain(true)`) позволяет отследить все стадии, которые проходит запрос:

* Parsed Logical Plan — получаем после синтаксического разбора SQL. На этом этапе проверяется только синтаксическая корректность запроса.
* Analyzed Logical Plan — на этом этапе добавляется информация о структуре используемых сущностей, проверяется соответствие структуры и запрашиваемых атрибутов.
* Optimized Logical Plan — на данном этапе происходит преобразование получившегося дерева запроса на основании доступных правил оптимизации.
* Physical Plan — начинают учитываться особенности доступа к исходным данным, включая оптимизации по фильтрации партиций и данных для минимизации получаемого набора данных.


In [None]:
help(df.explain)

Help on method explain in module pyspark.sql.dataframe:

explain(extended: Union[bool, str, NoneType] = None, mode: Optional[str] = None) -> None method of pyspark.sql.dataframe.DataFrame instance
    Prints the (logical and physical) plans to the console for debugging purposes.
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Parameters
    ----------
    extended : bool, optional
        default ``False``. If ``False``, prints only the physical plan.
        When this is a string without specifying the ``mode``, it works as the mode is
        specified.
    mode : str, optional
        specifies the expected output format of plans.
    
        * ``simple``: Print only a physical plan.
        * ``extended``: Print both logical and physical plans.
        * ``codegen``: Print a physical plan and generated codes if they are available.
        * ``cost``: Print a logical plan and statistics if they are available.
        * ``

In [None]:
df.explain()

== Physical Plan ==
*(1) Scan ExistingRDD[a#0L,b#1,c#2]




In [None]:
df.explain(True)

== Parsed Logical Plan ==
LogicalRDD [a#0L, b#1, c#2], false

== Analyzed Logical Plan ==
a: bigint, b: double, c: string
LogicalRDD [a#0L, b#1, c#2], false

== Optimized Logical Plan ==
LogicalRDD [a#0L, b#1, c#2], false

== Physical Plan ==
*(1) Scan ExistingRDD[a#0L,b#1,c#2]



[Хороший пример](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-troubleshooting-explain.html), демонстрирующий, как можно использовать `df.explain()` для отладки SQL-запросов

Создание df из списка кортежей

In [None]:
df = spark.createDataFrame([
    (1, 2., 'string1'),
    (2, 3., 'string2'),
    (3, 4., 'string3')
], schema='a long, b double, c string')

df.show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  3|4.0|string3|
+---+---+-------+



SparkSession.createDataFrame, который используется «под капотом», требует

__RDD__/__списка Row/tuple/list__/~dict~

или

__pandas.DataFrame__

если не предоставлена ​​схема с DataType.

In [None]:
try:
  df = spark.createDataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
  })
  df.show()
except Exception as e:
  print(e)

Can not infer schema for type: <class 'str'>


In [None]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
})
df = spark.createDataFrame(pandas_df)
df.show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  3|4.0|string3|
+---+---+-------+



In [None]:
df

DataFrame[a: bigint, b: double, c: string]

In [None]:
sc.parallelize([1,2,3])

ParallelCollectionRDD[21] at readRDDFromFile at PythonRDD.scala:274

Вариант создания df с описанной схемой

[Какие типы бывают у столбиков](https://spark.apache.org/docs/2.0.1/api/python/_modules/pyspark/sql/types.html)

In [None]:
from pyspark.sql.types import *

rdd = sc.parallelize([
    (1, 2., 'string1'),
    (2, 3., 'string2'),
    (3, 4., 'string3')
])

schema = StructType([
    StructField("a", LongType(), False),
    StructField("b", DoubleType(), False),
    StructField("c", StringType(), False),
])

df = spark.createDataFrame(rdd, schema=schema)
df.show()
rdd

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  3|4.0|string3|
+---+---+-------+



ParallelCollectionRDD[31] at readRDDFromFile at PythonRDD.scala:274

In [None]:
df.printSchema()

root
 |-- a: long (nullable = false)
 |-- b: double (nullable = false)
 |-- c: string (nullable = false)



## Просмотр, схема, сбор на клиенте

In [None]:
df.show(n=1)

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
+---+---+-------+
only showing top 1 row



In [None]:
df.show(2, vertical=True)

-RECORD 0------
 a   | 1       
 b   | 2.0     
 c   | string1 
-RECORD 1------
 a   | 2       
 b   | 3.0     
 c   | string2 
only showing top 2 rows



In [None]:
df.columns

['a', 'b', 'c']

In [None]:
df.schema

StructType(List(StructField(a,LongType,false),StructField(b,DoubleType,false),StructField(c,StringType,false)))

In [None]:
df.printSchema()

root
 |-- a: long (nullable = false)
 |-- b: double (nullable = false)
 |-- c: string (nullable = false)



In [None]:
df.take(2), df.head(2)

([Row(a=1, b=2.0, c='string1'), Row(a=2, b=3.0, c='string2')],
 [Row(a=1, b=2.0, c='string1'), Row(a=2, b=3.0, c='string2')])

In [None]:
df.rdd.collect()

[Row(a=1, b=2.0, c='string1'),
 Row(a=2, b=3.0, c='string2'),
 Row(a=3, b=4.0, c='string3')]

In [None]:
df.collect()

[Row(a=1, b=2.0, c='string1'),
 Row(a=2, b=3.0, c='string2'),
 Row(a=3, b=4.0, c='string3')]

In [None]:
df.toPandas()

Unnamed: 0,a,b,c
0,1,2.0,string1
1,2,3.0,string2
2,3,4.0,string3


In [None]:
df.printSchema()
df.toPandas().info()

root
 |-- a: long (nullable = false)
 |-- b: double (nullable = false)
 |-- c: string (nullable = false)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   a       3 non-null      int64  
 1   b       3 non-null      float64
 2   c       3 non-null      object 
dtypes: float64(1), int64(1), object(1)
memory usage: 200.0+ bytes


In [None]:
df.toPandas().describe()

Unnamed: 0,a,b
count,3.0,3.0
mean,2.0,3.0
std,1.0,1.0
min,1.0,2.0
25%,1.5,2.5
50%,2.0,3.0
75%,2.5,3.5
max,3.0,4.0


In [None]:
df.describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [None]:
df.summary().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   null|
| stddev|1.0|1.0|   null|
|    min|  1|2.0|string1|
|    25%|  1|2.0|   null|
|    50%|  2|3.0|   null|
|    75%|  3|4.0|   null|
|    max|  3|4.0|string3|
+-------+---+---+-------+



# SQL-подобные запросы

## Столбцы датафрейма

In [None]:
df.select('a', 'c').show()

+---+-------+
|  a|      c|
+---+-------+
|  1|string1|
|  2|string2|
|  3|string3|
+---+-------+



In [None]:
df.filter(df.a < 3).show()
df.filter(2 * df.a == df.b).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
+---+---+-------+

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
+---+---+-------+



In [None]:
df.a

Column<'a'>

In [None]:
df.select(df.a, df.c)

DataFrame[a: bigint, c: string]

Создание нового столбика на основе применения какой-то функции

In [None]:
from pyspark.sql.functions import upper

df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+-------+
|  a|  b|      c|upper_c|
+---+---+-------+-------+
|  1|2.0|string1|STRING1|
|  2|3.0|string2|STRING2|
|  3|4.0|string3|STRING3|
+---+---+-------+-------+



In [None]:
df.select(df.a * df.b).show()

+-------+
|(a * b)|
+-------+
|    2.0|
|    6.0|
|   12.0|
+-------+



In [None]:
q = df.select(df.a.alias('a1'), df.a.alias('a2'), 'c')
q.show()
q.filter(q.a1<3).show()

+---+---+-------+
| a1| a2|      c|
+---+---+-------+
|  1|  1|string1|
|  2|  2|string2|
|  3|  3|string3|
+---+---+-------+

+---+---+-------+
| a1| a2|      c|
+---+---+-------+
|  1|  1|string1|
|  2|  2|string2|
+---+---+-------+



In [None]:
from pyspark.sql.functions import col

df.select(col('a')).show()

+---+
|  a|
+---+
|  1|
|  2|
|  3|
+---+



In [None]:
df.select(df.a.alias('a1'), df.a.alias('a2'), 'c').where(col('a1')<3).show()

+---+---+-------+
| a1| a2|      c|
+---+---+-------+
|  1|  1|string1|
|  2|  2|string2|
+---+---+-------+



In [None]:
df.filter(col('b') < 'a').show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
+---+---+---+



## Юнион (конкатенация по строкам)

In [None]:
train = pd.DataFrame({"x1": range(3),
                   "y": ["a", "b", "a"],
                   "x2": reversed(range(3)),
                   "x3": 0})
train

Unnamed: 0,x1,y,x2,x3
0,0,a,2,0
1,1,b,1,0
2,2,a,0,0


In [None]:
test = pd.DataFrame({"x1": [2, 3],
                   "x2": [1, 2],
                   "x3": 0})
test

Unnamed: 0,x1,x2,x3
0,2,1,0
1,3,2,0


In [None]:
pd.concat([train, test], ignore_index=True)

Unnamed: 0,x1,y,x2,x3
0,0,a,2,0
1,1,b,1,0
2,2,a,0,0
3,2,,1,0
4,3,,2,0


Метод `union()` объединяет два фрейма данных и возвращает новый фрейм данных со всеми строками из двух фреймов данных независимо от дублирующихся данных.

In [None]:
tr = spark.createDataFrame(train)
te = spark.createDataFrame(test)
try:
  tr.union(te).show()
except Exception as e:
  print(e)

Union can only be performed on tables with the same number of columns, but the first table has 4 columns and the second table has 3 columns;
'Union false, false
:- LogicalRDD [x1#979L, y#980, x2#981L, x3#982L], false
+- LogicalRDD [x1#987L, x2#988L, x3#989L], false



Создаём отсутствующий столбик и объединяем оба df

In [None]:
from pyspark.sql.functions import lit
tr.union(te.withColumn('y', lit(None))).show()

+---+---+---+----+
| x1|  y| x2|  x3|
+---+---+---+----+
|  0|  a|  2|   0|
|  1|  b|  1|   0|
|  2|  a|  0|   0|
|  2|  1|  0|null|
|  3|  2|  0|null|
+---+---+---+----+



In [None]:
te.columns

['x1', 'x2', 'x3']

In [None]:
te.select(te.x1, lit(None), 'x2', col('x3')).show()

+---+----+---+---+
| x1|NULL| x2| x3|
+---+----+---+---+
|  2|null|  1|  0|
|  3|null|  2|  0|
+---+----+---+---+



In [None]:
tr.union(te.select(te.x1, lit(None), 'x2', col('x3'))).show()

+---+----+---+---+
| x1|   y| x2| x3|
+---+----+---+---+
|  0|   a|  2|  0|
|  1|   b|  1|  0|
|  2|   a|  0|  0|
|  2|null|  1|  0|
|  3|null|  2|  0|
+---+----+---+---+



In [None]:
tr.drop('y').union(te).show()

+---+---+---+
| x1| x2| x3|
+---+---+---+
|  0|  2|  0|
|  1|  1|  0|
|  2|  0|  0|
|  2|  1|  0|
|  3|  2|  0|
+---+---+---+



## Join
Объединяет 2 датафрейма по заданному условию

In [None]:
user = pd.DataFrame({"name": ["admin", "guest"],
                      "id": [1, 123]})
user

Unnamed: 0,name,id
0,admin,1
1,guest,123


In [None]:
n = 10
log = pd.DataFrame({"uid": np.random.choice([1, 123], n),
                    "result": np.random.choice(["done", "error"], n),
                    "time": np.arange(n)})
log

Unnamed: 0,uid,result,time
0,1,error,0
1,1,error,1
2,1,error,2
3,123,error,3
4,1,error,4
5,1,done,5
6,1,error,6
7,1,done,7
8,123,error,8
9,1,error,9


In [None]:
log.join(user.set_index('id'), on='uid')

Unnamed: 0,uid,result,time,name
0,1,error,0,admin
1,1,error,1,admin
2,1,error,2,admin
3,123,error,3,guest
4,1,error,4,admin
5,1,done,5,admin
6,1,error,6,admin
7,1,done,7,admin
8,123,error,8,guest
9,1,error,9,admin


In [None]:
users = spark.createDataFrame(user)
logs = spark.createDataFrame(log)
users.withColumnRenamed('id', 'uid').join(logs, 'uid').show()

+---+-----+------+----+
|uid| name|result|time|
+---+-----+------+----+
|  1|admin| error|   0|
|  1|admin| error|   1|
|  1|admin| error|   2|
|  1|admin| error|   4|
|  1|admin|  done|   5|
|  1|admin| error|   6|
|  1|admin|  done|   7|
|  1|admin| error|   9|
|123|guest| error|   3|
|123|guest| error|   8|
+---+-----+------+----+



In [None]:
users.join(logs, users.id==logs.uid).show()

+-----+---+---+------+----+
| name| id|uid|result|time|
+-----+---+---+------+----+
|admin|  1|  1| error|   0|
|admin|  1|  1| error|   1|
|admin|  1|  1| error|   2|
|admin|  1|  1| error|   4|
|admin|  1|  1|  done|   5|
|admin|  1|  1| error|   6|
|admin|  1|  1|  done|   7|
|admin|  1|  1| error|   9|
|guest|123|123| error|   3|
|guest|123|123| error|   8|
+-----+---+---+------+----+



## UDF - user-defined functions

In [None]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('bigint')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 5|
+------------------+



In [None]:
[pandas_plus_one(col(c)) for c in df.columns]

[Column<'pandas_plus_one(a)'>,
 Column<'pandas_plus_one(b)'>,
 Column<'pandas_plus_one(c)'>]

In [None]:
from pyspark.sql.types import StringType
StringType.typeName()

'string'

In [None]:
@pandas_udf('string')
def pandas_to_str(series: pd.Series) -> pd.Series:
    return series.astype('str').str.pad(10, 'left', '_')

df.select([pandas_to_str(col(c)) for c in df.columns]).show()

+----------------+----------------+----------------+
|pandas_to_str(a)|pandas_to_str(b)|pandas_to_str(c)|
+----------------+----------------+----------------+
|      _________1|      _______2.0|      ___string1|
|      _________2|      _______3.0|      ___string2|
|      _________3|      _______4.0|      ___string3|
+----------------+----------------+----------------+



In [None]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
+---+---+-------+



In [None]:
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

slen = udf(lambda s: len(s), IntegerType())

@udf
def to_upper(s):
    if s is not None:
        return s.upper()

@udf(returnType=IntegerType())
def add_one(x):
    if x is not None:
        return x + 1

df_docs = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
df_docs.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")).show()

+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
|         8|      JOHN DOE|          22|
+----------+--------------+------------+



## Группировка данных

In [None]:
fruit = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
fruit.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [None]:
fruit.groupby('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())  # assign переназначает данные в столбце

fruit.groupby('color').applyInPandas(plus_mean, schema=fruit.schema).show()  # applyInPandas - для GroupedData

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



## SQL

In [None]:
fruit.createOrReplaceTempView("tableA")
spark.sql("SELECT * from tableA").show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [None]:
spark.sql("SELECT * from tableA").explain()

== Physical Plan ==
*(1) Scan ExistingRDD[color#1353,fruit#1354,v1#1355L,v2#1356L]




In [None]:
print(spark.sql("EXPLAIN SELECT * from tableA").collect()[0])  # первая строка

Row(plan='== Physical Plan ==\n*(1) Scan ExistingRDD[color#1353,fruit#1354,v1#1355L,v2#1356L]\n\n')


In [None]:
spark.sql("SELECT count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       8|
+--------+



In [None]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("SELECT add_one(v1) FROM tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+



In [None]:
from pyspark.sql.functions import expr

fruit.selectExpr('add_one(v1)').show()
fruit.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+



# Ввод-вывод

## Запись и чтение результата

### CSV

In [None]:
df.write.csv('foo.csv', header=True)

In [None]:
ls -lh foo.csv

total 16K
-rw-r--r-- 1 root root  6 May 24 11:00 part-00000-952ffc0e-b777-444b-8abe-37f5c14b3f3a-c000.csv
-rw-r--r-- 1 root root 20 May 24 11:00 part-00001-952ffc0e-b777-444b-8abe-37f5c14b3f3a-c000.csv
-rw-r--r-- 1 root root 20 May 24 11:00 part-00002-952ffc0e-b777-444b-8abe-37f5c14b3f3a-c000.csv
-rw-r--r-- 1 root root 20 May 24 11:00 part-00003-952ffc0e-b777-444b-8abe-37f5c14b3f3a-c000.csv
-rw-r--r-- 1 root root  0 May 24 11:00 _SUCCESS


In [None]:
df.rdd.getNumPartitions()

4

In [None]:
cat foo.csv/part-00001-*.csv

a,b,c
1,2.0,string1


In [None]:
spark.read.csv('foo.csv/part-00001*.csv', header=True).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
+---+---+-------+



In [None]:
spark.read.csv('foo.csv', header=True).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  4|5.0|string3|
+---+---+-------+



In [None]:
!hd foo.csv/part-00001*.csv

00000000  61 2c 62 2c 63 0a 31 2c  32 2e 30 2c 73 74 72 69  |a,b,c.1,2.0,stri|
00000010  6e 67 31 0a                                       |ng1.|
00000014


In [None]:
df.repartition(3).rdd.getNumPartitions()

3

`df.coalesce(num_partitions)` - усовершенствованный аналог `df.repartition(num_partitions)`:

In [None]:
df.coalesce(1).rdd.getNumPartitions()

1

In [None]:
df.coalesce(1).write.csv('foo_1part.csv', header=True)
!ls -lh foo_1part.csv

total 4.0K
-rw-r--r-- 1 root root 48 May 24 11:02 part-00000-c12abad8-f10b-4a21-a0cd-c578d9f87670-c000.csv
-rw-r--r-- 1 root root  0 May 24 11:02 _SUCCESS


In [None]:
spark.read.csv('foo_1part.csv', header=True).show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  2|3.0|string2|
|  4|5.0|string3|
+---+---+-------+



### [Parquet](https://bigdataschool.ru/wiki/parquet)
[Отличия Parquet от CSV](https://medium.com/ssense-tech/csv-vs-parquet-vs-avro-choosing-the-right-tool-for-the-right-job-79c9f56914a8)

In [None]:
df.repartition(3).write.parquet('bar.parquet')

In [None]:
ls -lh bar.parquet

total 12K
-rw-r--r-- 1 root root 945 May  5 12:55 part-00000-cba327cc-449b-4a4c-8381-ca0e9501eff3-c000.snappy.parquet
-rw-r--r-- 1 root root 945 May  5 12:55 part-00001-cba327cc-449b-4a4c-8381-ca0e9501eff3-c000.snappy.parquet
-rw-r--r-- 1 root root 943 May  5 12:55 part-00002-cba327cc-449b-4a4c-8381-ca0e9501eff3-c000.snappy.parquet
-rw-r--r-- 1 root root   0 May  5 12:55 _SUCCESS


In [None]:
spark.read.parquet('bar.parquet/part-00000-*.parquet').show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  3|4.0|string3|
+---+---+-------+



In [None]:
spark.read.parquet('bar.parquet').show()

+---+---+-------+
|  a|  b|      c|
+---+---+-------+
|  1|2.0|string1|
|  3|4.0|string3|
|  2|3.0|string2|
+---+---+-------+



## Текстовый формат

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
textFile = spark.read.text("/content/drive/MyDrive/АБД/Задания/11. Обработка данных в Apache Spark/data/test.txt")

In [None]:
textFile.repartition(4).explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Exchange RoundRobinPartitioning(4), REPARTITION_BY_NUM, [plan_id=213]
   +- FileScan text [value#185] Batched: false, DataFilters: [], Format: Text, Location: InMemoryFileIndex(1 paths)[file:/content/drive/MyDrive/АБД/Задания/11. Обработка данных в Apache ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:string>




In [None]:
textFile

DataFrame[value: string]

In [None]:
textFile.count()

3

In [None]:
textFile.first()

Row(value='Таким образом сложившаяся структура организации играет важную роль в формировании направлений прогрессивного развития.')

In [None]:
linesWithSpark = textFile.filter(textFile.value.contains("образом"))
linesWithSpark

DataFrame[value: string]

In [None]:
linesWithSpark.show(truncate=False)

+----------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                 |
+----------------------------------------------------------------------------------------------------------------------+
|Таким образом сложившаяся структура организации играет важную роль в формировании направлений прогрессивного развития.|
|Таким образом консультация с широким активом позволяет оценить значение направлений прогрессивного развития.          |
+----------------------------------------------------------------------------------------------------------------------+



In [None]:
linesWithSpark.rdd.getNumPartitions()

1

In [None]:
linesWithSpark.write.text("text.txt")

In [None]:
ls -lh text.txt

total 4.0K
-rw-r--r-- 1 root root 429 May  5 13:15 part-00000-7bd056d4-edc4-49c7-98df-cb69dea72987-c000.txt
-rw-r--r-- 1 root root   0 May  5 13:15 _SUCCESS


In [None]:
!head text.txt/part*.txt

Таким образом сложившаяся структура организации играет важную роль в формировании направлений прогрессивного развития.
Таким образом консультация с широким активом позволяет оценить значение направлений прогрессивного развития.


In [None]:
linesWithSpark.write.csv("text.csv", header=True)

In [None]:
ls -lh text.csv

total 4.0K
-rw-r--r-- 1 root root 435 May  5 13:16 part-00000-a6171931-e38e-4692-b354-e83b854e671a-c000.csv
-rw-r--r-- 1 root root   0 May  5 13:16 _SUCCESS


In [None]:
!head text.csv/part*.csv

value
Таким образом сложившаяся структура организации играет важную роль в формировании направлений прогрессивного развития.
Таким образом консультация с широким активом позволяет оценить значение направлений прогрессивного развития.


In [None]:
rm -r text.csv

## Кеширование

https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.StorageLevel.html

`StorageLevel` - флаги для управления хранилищем RDD. На каждом уровне хранилища записывается, следует ли записывать RDD на диск, если ему не хватает памяти, следует ли сохранять данные в памяти в сериализованном формате и следует ли реплицировать разделы RDD на нескольких узлах. Также содержит статические константы для некоторых часто используемых уровней хранения, MEMORY_ONLY. Поскольку данные всегда сериализуются на стороне Python, все константы используют сериализованные форматы.

```
DISK_ONLY

DISK_ONLY_2

DISK_ONLY_3

MEMORY_AND_DISK

MEMORY_AND_DISK_2

MEMORY_AND_DISK_DESER

MEMORY_ONLY

MEMORY_ONLY_2

NONE

OFF_HEAP
```

In [None]:
linesWithSpark.storageLevel

StorageLevel(False, False, False, False, 1)

In [None]:
help(linesWithSpark.cache)

Help on method cache in module pyspark.sql.dataframe:

cache() -> 'DataFrame' method of pyspark.sql.dataframe.DataFrame instance
    Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`).
    
    .. versionadded:: 1.3.0
    
    .. versionchanged:: 3.4.0
        Supports Spark Connect.
    
    Notes
    -----
    The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0.
    
    Returns
    -------
    :class:`DataFrame`
        Cached DataFrame.
    
    Examples
    --------
    >>> df = spark.range(1)
    >>> df.cache()
    DataFrame[id: bigint]
    
    >>> df.explain()
    == Physical Plan ==
    InMemoryTableScan ...



In [None]:
linesWithSpark.cache()

DataFrame[value: string]

In [None]:
linesWithSpark.storageLevel

StorageLevel(True, True, False, True, 1)

`df.persist()` позволяет указать уровень кэширования в отличие от `df.cache()`

In [None]:
spark.catalog.clearCache()  # почистим кэш

In [None]:
from pyspark import StorageLevel
linesWithSpark.persist(StorageLevel.MEMORY_ONLY)

DataFrame[value: string]

In [None]:
linesWithSpark.storageLevel

StorageLevel(False, True, False, False, 1)

# Реальный датасет

In [None]:
ch_train = spark.read.csv('/content/sample_data/california_housing_train.csv', header=True)
ch_test = spark.read.csv('/content/sample_data/california_housing_test.csv', header=True)

In [None]:
ch_train.show(n=2)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|      80100.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
only showing top 2 rows



In [None]:
ch_test.show(n=2)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
|-122.050000|37.370000|         27.000000|3885.000000|    661.000000|1537.000000|606.000000|     6.608500|     344700.000000|
|-118.300000|34.260000|         43.000000|1510.000000|    310.000000| 809.000000|277.000000|     3.599000|     176500.000000|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+
only showing top 2 rows



In [None]:
ch_train.count(), ch_test.count(), ch_train.count() + ch_test.count()

(17000, 3000, 20000)

In [None]:
ch = ch_train.union(ch_test)
ch.count()

20000

In [None]:
ch.printSchema()

root
 |-- longitude: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- housing_median_age: string (nullable = true)
 |-- total_rooms: string (nullable = true)
 |-- total_bedrooms: string (nullable = true)
 |-- population: string (nullable = true)
 |-- households: string (nullable = true)
 |-- median_income: string (nullable = true)
 |-- median_house_value: string (nullable = true)



In [None]:
ch.selectExpr("cast(longitude as float) longitude",
    "cast(latitude as float) latitude",
    "cast(housing_median_age as float) housing_median_age",
    "cast(total_rooms as float) total_rooms",
    "cast(total_bedrooms as float) total_bedrooms",
    "cast(population as float) population",
    "cast(households as float) households",
    "cast(median_income as float) median_income",
    "cast(median_house_value as float) median_house_value").printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)



In [None]:
from pyspark.sql.types import FloatType

ch.select(*[ch[c].cast(FloatType()) for c in ch.columns]).printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)



In [None]:
ch_train_location = "sample_data/california_housing_train.csv"
ch_test_location = "sample_data/california_housing_test.csv"

reader = spark.read.format("csv") \
  .option("inferSchema", True) \
  .option("header", True) \
  .option("sep", ",")
  
ch_train = reader.load(ch_train_location)
ch_test = reader.load(ch_test_location)
 
ch_train.show(3)
ch_test.show(3, truncate=False)
ch_test.show(1, vertical=True)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+
only showing top 3 rows

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+

In [None]:
ch_train.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)



In [None]:
ch.describe().show()

+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|summary|          longitude|          latitude|housing_median_age|       total_rooms|   total_bedrooms|        population|        households|     median_income|median_house_value|
+-------+-------------------+------------------+------------------+------------------+-----------------+------------------+------------------+------------------+------------------+
|  count|              20000|             20000|             20000|             20000|            20000|             20000|             20000|             20000|             20000|
|   mean|-119.56617199999974|35.626749500000294|          28.62775|        2637.05155|         537.9918|        1425.55765|         499.52545|3.8721321550000174|      207082.71675|
| stddev| 2.0036088387792805|2.1361409537196905|12.582229355812835|2176.3147574716327|420.63111

## Добавим признаков

In [None]:
ch = ch.withColumn("median_house_value", ch["median_house_value"]/100000)
ch.take(2)

[Row(longitude='-114.310000', latitude='34.190000', housing_median_age='15.000000', total_rooms='5612.000000', total_bedrooms='1283.000000', population='1015.000000', households='472.000000', median_income='1.493600', median_house_value=0.669),
 Row(longitude='-114.470000', latitude='34.400000', housing_median_age='19.000000', total_rooms='7650.000000', total_bedrooms='1901.000000', population='1129.000000', households='463.000000', median_income='1.820000', median_house_value=0.801)]

In [None]:
ch["total_rooms"]/ch["households"]

Column<'(total_rooms / households)'>

In [None]:
roomsPerHousehold = ch.select(ch["total_rooms"]/ch["households"])
roomsPerHousehold

DataFrame[(total_rooms / households): double]

In [None]:
ch = ch.withColumn('rooms_per_household', ch["total_rooms"]/ch["households"])
ch.show(5)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|rooms_per_household|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|             0.669| 11.889830508474576|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|             0.801|  16.52267818574514|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|             0.857|  6.153846153846154|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 515.000000|226.000000|     3.191700|             0.734

In [None]:
ch = ch.withColumn('population_per_household', ch["population"]/ch["households"])
ch.show(5)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+------------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|rooms_per_household|population_per_household|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+------------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|             0.669| 11.889830508474576|      2.1504237288135593|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|             0.801|  16.52267818574514|      2.4384449244060473|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000|117.000000|     1.650900|             0.857|  6.153846153846154|

In [None]:
ch = ch.withColumn('bedrooms_per_room', ch["total_bedrooms"]/ch["total_rooms"])
ch.show(5)

+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+------------------------+-------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population|households|median_income|median_house_value|rooms_per_household|population_per_household|  bedrooms_per_room|
+-----------+---------+------------------+-----------+--------------+-----------+----------+-------------+------------------+-------------------+------------------------+-------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000|472.000000|     1.493600|             0.669| 11.889830508474576|      2.1504237288135593|0.22861724875267284|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000|463.000000|     1.820000|             0.801|  16.52267818574514|      2.4384449244060473|0.24849673202614378|
|-114.560000|33.690000|         17.000000| 72

### [Dask](https://www.dask.org/) - попытка сделать распределенный pandas

# Задание
1. Прочитать датасет пассажиров титаника в спарк
2. Вывести pie chart по количеству пассажиров в разных классах с помощью tempView и SQL
3. Найти среднюю выживаемость (% выживших) пассажиров по полу и классу с помощью tempView и SQL
4. Вывести pie chart по количеству пассажиров в разных классах методами спарка без tempView
5. Найти среднюю выживаемость пассажиров по полу и классу методами спарка без tempView

In [None]:
!wget -c https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv

--2023-05-24 11:26:52--  https://web.stanford.edu/class/archive/cs/cs109/cs109.1166/stuff/titanic.csv
Resolving web.stanford.edu (web.stanford.edu)... 171.67.215.200, 2607:f6d0:0:925a::ab43:d7c8
Connecting to web.stanford.edu (web.stanford.edu)|171.67.215.200|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 44225 (43K) [text/csv]
Saving to: ‘titanic.csv’


2023-05-24 11:26:52 (1.15 MB/s) - ‘titanic.csv’ saved [44225/44225]



In [None]:
!head titanic.csv

Survived,Pclass,Name,Sex,Age,Siblings/Spouses Aboard,Parents/Children Aboard,Fare
0,3,Mr. Owen Harris Braund,male,22,1,0,7.25
1,1,Mrs. John Bradley (Florence Briggs Thayer) Cumings,female,38,1,0,71.2833
1,3,Miss. Laina Heikkinen,female,26,0,0,7.925
1,1,Mrs. Jacques Heath (Lily May Peel) Futrelle,female,35,1,0,53.1
0,3,Mr. William Henry Allen,male,35,0,0,8.05
0,3,Mr. James Moran,male,27,0,0,8.4583
0,1,Mr. Timothy J McCarthy,male,54,0,0,51.8625
0,3,Master. Gosta Leonard Palsson,male,2,3,1,21.075
1,3,Mrs. Oscar W (Elisabeth Vilhelmina Berg) Johnson,female,27,0,2,11.1333
