In [2]:
import pyspark
import pyarrow 

import pandas as pd
import numpy as np

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

In [3]:
sc = pyspark.SparkContext(master='local[4]', appName='notebook')
ss = SparkSession(sc)

In [4]:
from hdfs import InsecureClient

hdfs_client = InsecureClient("http://localhost:8888", user='hdfs')

In [5]:
df = pd.read_csv('/Users/starrovoyt/Desktop/hse/course_work/notebooks/data.scv')
df.head()

Unnamed: 0,id1,id_type,neighbour_id,neighbour_id_type
0,gfhjk,login,1,other
1,gfhjk,login,3,other
2,gfhjk,login,2,other
3,fghj,login,1,other
4,sdftygu,login,1,other


In [6]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("id_type", StringType(), True),
    StructField("neighbour_id", StringType(), True),
    StructField("neighbour_id_type", StringType(), True),
])


df = ss.read.csv('/Users/starrovoyt/Desktop/hse/course_work/notebooks/data.scv', header=True, schema=schema)
df.printSchema()

df.registerTempTable("edges_table")

df.show()

root
 |-- id: string (nullable = true)
 |-- id_type: string (nullable = true)
 |-- neighbour_id: string (nullable = true)
 |-- neighbour_id_type: string (nullable = true)

+----------+-------+------------+-----------------+
|        id|id_type|neighbour_id|neighbour_id_type|
+----------+-------+------------+-----------------+
|     gfhjk|  login|           1|            other|
|     gfhjk|  login|           3|            other|
|     gfhjk|  login|           2|            other|
|      fghj|  login|           1|            other|
|   sdftygu|  login|           1|            other|
|      dfgh|  login|           1|            other|
|    qwedfg|  login|           1|            other|
|     rtyui|  login|           1|            other|
|jhsdgflsdf|  login|           1|            other|
|  skjdfhsd|  login|           1|            other|
+----------+-------+------------+-----------------+



In [7]:
neighbours = ss.sql("""
select id, id_type, count(map(neighbour_id, neighbour_id_type)) as neighbours_number
from edges_table
where id_type = 'login'
group by id, id_type
""")

neighbours.show()

+----------+-------+-----------------+
|        id|id_type|neighbours_number|
+----------+-------+-----------------+
|     rtyui|  login|                1|
|      fghj|  login|                1|
|  skjdfhsd|  login|                1|
|      dfgh|  login|                1|
|jhsdgflsdf|  login|                1|
|   sdftygu|  login|                1|
|     gfhjk|  login|                3|
|    qwedfg|  login|                1|
+----------+-------+-----------------+



In [8]:
neighbours.registerTempTable("neighbours")
neighbours_number = ss.sql(
"""
select neighbours_number
from neighbours
where id = 'gfhjk' and id_type = 'login'
""").toDF('neighbours_number')

In [9]:
neighbours_number.show()

+-----------------+
|neighbours_number|
+-----------------+
|                3|
+-----------------+



In [10]:
neighbours_number.toPandas().neighbours_number[0]

3

In [11]:
df = ss.read.csv('/Users/starrovoyt/Desktop/hse/course_work/notebooks/data.scv', header=True, schema=schema)

In [47]:
from pyspark.sql import Row
from functools import partial

class mapper(object):
    def __init__(self, id_type):
        self.id_type = id_type
    
    def __call__(self, row):
        return {'id + id_type': row.id + ' ' + self.id_type}

# def trial_mapper(row, id_type):
#     return {'id + id_type': row.id + ' ' + id_type}

new_df = df.rdd.map(mapper('login')).toDF()

In [48]:
new_df.toPandas()

Unnamed: 0,id + id_type
0,gfhjk login
1,gfhjk login
2,gfhjk login
3,fghj login
4,sdftygu login
5,dfgh login
6,qwedfg login
7,rtyui login
8,jhsdgflsdf login
9,skjdfhsd login


In [35]:
df_sample = df.sample(False, 0.5)
type(df_sample)

pyspark.sql.dataframe.DataFrame