In [1]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType
from pyspark.sql.functions import *

from graphframes import *
import networkx as nx

import pandas as pd
import numpy as np
import scipy as sp


import matplotlib.pyplot as plt
%matplotlib inline

import time 
import sys

import findspark
findspark.init()

In [2]:
spark = (SparkSession.builder
                    .config('spark.executor.memory', '36g')
                    .config('spark.executor.cores', '3')
                    .config('spark.executor.instance', '2')
                    .config('spark.driver.cores', '1')
                    .config('spark.driver.memory', '12g')
                    #.config('spark.extraListeners', 'sparkmonitor.listener.JupyterSparkMonitorListener')
                    #.config('spark.driver.extraClassPath', '/home/kei/.local/lib/python3.8/site-packages/sparkmonitor/listener_2.12.jar')
                    .getOrCreate())

sc = spark.sparkContext

spark.sparkContext._conf.getAll()

[('spark.app.startTime', '1671810081862'),
 ('spark.driver.host', '192.168.1.103'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'PySparkShell'),
 ('spark.driver.port', '39933'),
 ('spark.app.id', 'local-1671810084184'),
 ('spark.sql.catalogImplementation', 'hive'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.sql.warehouse.dir',
  'file:/home/kei/Desktop/Projects-/Innostage/spark-warehouse'),
 ('spark.ui.showConsoleProgress', 'true')]

In [3]:
sc.setCheckpointDir('/home/kei/Desktop/Projects-/Innostage/Checkpoints')

In [4]:
cleared_df = spark.read.load('/home/kei/Desktop/data/innostage/innostage_task_4/data/Stage_1/cleared_df.parquet',
                    format= 'parquet', sep= ';', inferSchema= 'true', header= 'true')

                                                                                

In [5]:
nodes = (cleared_df.select('src').distinct()).union(cleared_df.select('dst').distinct()).distinct()

edge = cleared_df

vertex = nodes.select(col('src').alias('id'))

graph = GraphFrame(vertex, edge)

In [6]:
comp0 = spark.read.load('/home/kei/Desktop/data/innostage/innostage_task_4/data/Stage_1/comp0.parquet')

In [7]:
mev_1 = ['125.194.32.161']
mev_2 = ['125.194.32.167']
mev_3 = ['125.194.32.169']
mev_4 = ['125.194.32.188']
mev_5 = ['125.194.32.189']
mev_6 = ['125.194.32.190']
mev_7 = ['125.194.32.174']
mev_8 = ['125.194.32.163']
mev_9 = ['125.194.32.69']
mev_10 = ['125.194.32.179']
mev_11 = ['125.194.32.185']
mev_12 = ['125.194.32.170']
mev_13 = ['125.194.32.187']
mev_14 = ['125.194.32.180']
mev_15 = ['125.194.32.175']
mev_16 = ['125.194.32.182']
mev_17 = ['125.194.32.176']
mev_18 = '125.194.32.183'
mev_19 = '125.194.32.191'
mev_20 = '125.194.32.162'
mev_21 = '125.194.32.173'
mev_22 = '125.194.32.186'
mev_23 = '10.77.172.2'
mev_24 = '125.194.32.32'
mev_25 = '10.77.171.51'
mev_26 = '10.77.189.12'
mev_27 = '125.194.32.64'

In [10]:
def delete_weaklylinked_vertices(graph_c0, node, graph_edges, path_save_file, file_name):
    timing = time.time()
    path = path_save_file + file_name
    
    
    
#вершину из списка удаляю из вершин и ребер графа.
    new_vertices = graph_c0.select('id').where(col('id') != node)
    new_edges = graph_edges.where(col('src') != node).where(col('dst') != node)
#   |
#создаю новый граф без этой вершины
    new_graph = GraphFrame(new_vertices, new_edges)
#   | 
#раскладываю его на компоненты
    new_connected_comp = new_graph.connectedComponents()
#   |
#группирую компоненты по "components" и узнаю количество каждого компонента
    group_by_comp = new_connected_comp.groupBy('component').count()
#   |
#выбираю только те компоненты, количество которых != 1
    unique_comp = group_by_comp.select('component').where(col('count') != 1).collect()
#   |
#создаю список этих компонентов
    component_list = [i.component for i in unique_comp]
#   |
#далее выбираю все id из датафрейма connectedComponents с условием что компонент есть в списке
    new_df_comp = new_connected_comp.select('id').where(col('component').isin(component_list))
#   |
#возвращаю вершину обратно
    used_node_df = spark.createDataFrame([(node)], 'string').toDF('id')
    new_nodes_data = new_df_comp.union(used_node_df)
#   |
#   |
#сохраняю в формате parquet объекты id
    new_nodes_data.select('id') \
                  .write.format('parquet').mode('overwrite').option('header', 'true') \
                  .save(path)
    
    
    done_time = time.time() - timing
    return print(' Done for: {0} minutes'.format(done_time // 60), '\n' \
                ,'Algorithm removed: ', graph_c0.count() - new_nodes_data.count(), ' weakly linked nodes.')

In [52]:
path_upd_data = "/home/kei/Desktop/data/innostage/innostage_task_4/data/Stage_2/"

In [53]:
filename = 'nodes_28.parquet/'

In [54]:
new_nodes = spark.read.load('/home/kei/Desktop/data/innostage/innostage_task_4/data/Stage_2/nodes_26.parquet/')

In [55]:
delete_weaklylinked_vertices(new_nodes, mev_27, graph.edges, path_upd_data, filename)



 Done for: 5.0 minutes 
 Algorithm removed:  212  weakly linked nodes.


                                                                                

In [14]:
print(' Done for: {0} minutes'.format(18000 // 60), '\n' \
     ,'Algorithm removed: ', comp0.count() - new_nodes.count(), ' weakly linked nodes.')

 Done for: 300 minutes 
 Algorithm removed:  781145  weakly linked nodes.
