In [16]:
!python --version

Python 3.10.12


In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=7d4a284c4bbc69705f9c23406badfefea82f2f7f7dec10085c058e121df87928
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


In [2]:
pip show pyspark

Name: pyspark
Version: 3.5.0
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /usr/local/lib/python3.10/dist-packages
Requires: py4j
Required-by: 


In [3]:
# PySparkを起動
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('myApp').getOrCreate()

# DataFrameを作成
data = [("Alice", 1), ("Bob", 2), ("Charlie", 3)]
df = spark.createDataFrame(data, ["Name", "Age"])

# DataFrameを表示
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice|  1|
|    Bob|  2|
|Charlie|  3|
+-------+---+



In [4]:
import pyspark.sql.functions as F
from pyspark.sql.types import FloatType, TimestampType, StringType


df = spark.createDataFrame([[1, 2, 3], [2, 3, 4], [4, 5, 6]],
                           ['id', 'a', 'b'])
df2 = spark.createDataFrame([[1], [2]], ['id'])

df.show()

df2.show()

+---+---+---+
| id|  a|  b|
+---+---+---+
|  1|  2|  3|
|  2|  3|  4|
|  4|  5|  6|
+---+---+---+

+---+
| id|
+---+
|  1|
|  2|
+---+



# 作成したDataFrameのうち、IDが20番台のレコードを取得したい

In [5]:
sdf = spark.createDataFrame([[1, 'abc', 123], [2, 'aaa', 456], [4, 'bbb', 678]],
                           ['id', 'a', 'b'])

In [7]:
data2 = [(3, 'red', 234),
         (5, 'yellow', 234),
         (6, 'ccc', 123),
         (7, 'a', 123),
         (8, 'b', 123),
         (9, 'c', 123),
         (10, 'd', 123),
         (11, 'e', 234),
         (12, 'f', 234),
         (13, 'g', 123),
         (14, 'h', 123),
         (15, 'i', 123),
         (16, 'j', 123),
         (17, 'k', 123),
         (18, 'l', 123),
         (19, 'm', 123),
         (20, 'n', 123),
         (21, 'o', 234),
         (22, 'p', 234),
         (23, 'q', 123),
         (24, 'r', 123),
         (25, 's', 123),
         (26, 't', 123),
         (27, 'u', 123),
         (28, 'v', 123),
         (29, 'w', 123),
         (30, 'x', 123),
         (31, 'y', 123),
         ]

sdf2 = spark.createDataFrame(data2, ['id', 'a', 'b'])

sdf = sdf.union(sdf2)

display(sdf)

DataFrame[id: bigint, a: string, b: bigint]

In [36]:
# 全件表示
sdf.show(31)

+---+------+---+
| id|     a|  b|
+---+------+---+
|  1|   abc|123|
|  2|   aaa|456|
|  4|   bbb|678|
|  3|   red|234|
|  5|yellow|234|
|  6|   ccc|123|
|  7|     a|123|
|  8|     b|123|
|  9|     c|123|
| 10|     d|123|
| 11|     e|234|
| 12|     f|234|
| 13|     g|123|
| 14|     h|123|
| 15|     i|123|
| 16|     j|123|
| 17|     k|123|
| 18|     l|123|
| 19|     m|123|
| 20|     n|123|
| 21|     o|234|
| 22|     p|234|
| 23|     q|123|
| 24|     r|123|
| 25|     s|123|
| 26|     t|123|
| 27|     u|123|
| 28|     v|123|
| 29|     w|123|
| 30|     x|123|
| 31|     z|123|
+---+------+---+



In [35]:
# 20～29をそれぞれ指定すると指定したidのレコードが取得できるが、少し手間
sdf.filter(
F.col("id").isin(20, 21, 22, 23, 24, 25, 26, 27, 28, 29)
).show()

+---+---+---+
| id|  a|  b|
+---+---+---+
| 20|  n|123|
| 21|  o|234|
| 22|  p|234|
| 23|  q|123|
| 24|  r|123|
| 25|  s|123|
| 26|  t|123|
| 27|  u|123|
| 28|  v|123|
| 29|  w|123|
+---+---+---+



In [10]:
# 20番台の範囲を指定したいのでrangeを使用したがSparkRuntimeExceptionが発生してしまう
sdf.filter(
F.col("id").isin(range(20, 30))
)

SparkRuntimeException: ignored

In [18]:
type(range(20, 30))

range

In [19]:
type(list(range(20, 30)))

list

In [12]:
# rangeをlistで囲んでlist型に変換すると20番台のレコードが取得できた
sdf.filter(
F.col("id").isin(list(range(20, 30)))
).show()

+---+---+---+
| id|  a|  b|
+---+---+---+
| 20|  n|123|
| 21|  o|234|
| 22|  p|234|
| 23|  q|123|
| 24|  r|123|
| 25|  s|123|
| 26|  t|123|
| 27|  u|123|
| 28|  v|123|
| 29|  w|123|
+---+---+---+

