In [1]:
import wave
import cv2
import numpy as np

import random
import base64
import math
import os

Wc = np.indices((8, 8)).sum(axis=0) % 2


class Inserter:
    def __init__(self, file_dir, string_plain_text, key):
        image_file = File(file_dir)
        self.image_filesize = os.path.getsize(file_dir)
        self.ndarray = image_file.read_ndarray_image_file()
        self.h, self.w, self.color = self.ndarray.shape
        self.extension = "txt"
#         secret_message = File(secret_message_dir)
#         self.extension = secret_message.get_extention()
#         self.string_message = ""

        byte_message = bytes(string_plain_text, 'utf-8')
        self.message = base64.b64encode(byte_message).decode('utf-8')

        self.key = key

    def count_seed(self):
        return sum([ord(i) for i in self.key])

    def encrypt_message(self, encrypted, key):
        sign = 1 if encrypted else 0

        self.ndarray[0][0][0] = self.ndarray[0][0][0] & 254 | sign
        if encrypted:
            self.string_message = encrypt_vigenere(self.string_message, key)

    def random_list(self, randomize_frames):
        sign = 1 if randomize_frames else 0

        self.ndarray[0][0][1] = self.ndarray[0][0][1] & 254 | sign
        if randomize_frames:
            random.seed(self.seed)
            if self.method == 'lsb':
                random.shuffle(self.pixel_list)
            elif self.method == 'bpcs':
                random.shuffle(self.block_list)

    def modify_pixel(self, array_bit):
        index = 0
        for i in self.pixel_list:
            if index >= len(array_bit):
                break
            if i >= 3:
                h, w, color = self.get_ndarray_pos(i)
                self.ndarray[h][w][color] = self.ndarray[h][w][color] & 254 | array_bit[index]
                index += 1
        if index < len(array_bit):
            error = "Ukuran pesan melebihi kapasitas payload!"
            raise RuntimeError(error)

    def insert_alpha(self):
        alpha_str = str(self.alpha)[:7].ljust(7, '0')
        alpha_bits = list(map(int, ''.join([bin(ord(i)).lstrip('0b').rjust(8, '0') for i in alpha_str])))
        index = 0
        for i in range(1, 8):
            for j in range(8):
                self.ndarray[i][j][0] = self.ndarray[i][j][0] & 254 | alpha_bits[index]
                index += 1

    def pbc_to_cgc(self):
        b = self.ndarray
        g = b >> 7
        for i in range(7, 0, -1):
            g <<= 1
            g |= ((b >> i) & 1) ^ ((b >> (i - 1)) & 1)
        self.ndarray = g

    def cgc_to_pbc(self):
        g = self.ndarray
        b = g >> 7
        for i in range(7, 0, -1):
            b_before = b.copy()
            b <<= 1
            b |= (b_before & 1) ^ ((g >> (i - 1)) & 1)
        self.ndarray = b

    def complexity(self, matrix):
        maxim = ((matrix.shape[0] - 1) * matrix.shape[1]) + ((matrix.shape[1] - 1) * matrix.shape[0])
        curr = 0.0
        first = matrix[0,0]
        for i in range(matrix.shape[0]):
            for j in range(matrix.shape[1]):
                if (first != matrix[i,j]):
                    curr = curr + 1
                    first = matrix[i,j]
        first = matrix[0,0]
        for i in range(matrix.shape[1]):
            for j in range(matrix.shape[0]):
                if (first != matrix[j,i]):
                    curr = curr + 1
                    first = matrix[j,i]
        return curr/maxim
    
    def conjugate(self, P):
        return P ^ Wc

    def modify_block(self, array_bit):
        index = 0
        for bitplane in range(7, -1, -1):
            # bitplane 0 = msb, 7 = lsb
            for h, w, color in self.block_list:
                if index >= len(array_bit):
                    break
                if h != 0 and w != 0 and color != 0:
                    matrix = self.ndarray[h:h+8, w:w+8, color] >> (7 - bitplane) & 1
                    if self.complexity(matrix) > self.alpha:
                        current_bit = np.zeros((8, 8), dtype=int)
                        for i in range(8):
                            for j in range(8):
                                if (i > 0 or j > 0) and index < len(array_bit):
                                    current_bit[i, j] = array_bit[index]
                                    index += 1

                        if self.complexity(current_bit) <= self.alpha:
                            current_bit = self.conjugate(current_bit)
                            current_bit[0, 0] = 1

                        left = (self.ndarray[h:h+8, w:w+8, color] >> (8 - bitplane)) << (8 - bitplane)
                        right = self.ndarray[h:h+8, w:w+8, color] & ((1 << (8 - bitplane - 1)) - 1)
                        self.ndarray[h:h+8, w:w+8, color] = left + (current_bit << 7 - bitplane) + right
            if index >= len(array_bit):
                break
        if index < len(array_bit):
            error = "Ukuran pesan melebihi kapasitas payload!"
            raise RuntimeError(error)

    def get_ndarray_pos(self, idx):
        color = idx % self.color
        w = (idx // self.color) % self.w
        h = idx // (self.color * self.w)
        return h, w, color

    def insert_message(self, encrypted=False, randomize=False, method='bpcs', alpha=0.3):
        self.seed = self.count_seed()
        self.method = method
        self.alpha = alpha

        self.string_message = str(len(self.message)) + '#' + "txt" + '#' + self.message
        self.encrypt_message(encrypted, self.key)

        bits = map(int, ''.join(
            [bin(ord(i)).lstrip('0b').rjust(8, '0') for i in self.string_message]))
        array_bit = list(bits)

        if method == 'lsb':
            self.pixel_list = list(range(self.h * self.w * self.color))
            self.random_list(randomize)
            self.modify_pixel(array_bit)
            self.ndarray[0, 0, 2] = self.ndarray[0, 0, 2] & 254
        elif method == 'bpcs':
            self.block_list = []
            for h in range(0, self.h - (self.h % 8), 8):
                for w in range(0, self.w - (self.w % 8), 8):
                    for color in range(0, self.color):
                        self.block_list += [(h, w, color)]
            self.random_list(randomize)
            self.pbc_to_cgc()
            self.modify_block(array_bit)
            self.cgc_to_pbc()
            self.insert_alpha()
            self.ndarray[0, 0, 2] = self.ndarray[0, 0, 2] & 254 | 1

        return self.ndarray

class Extractor:
    def __init__(self, file_dir, key):
        stegano_image_file = File(file_dir)
        self.ndarray = stegano_image_file.read_ndarray_image_file()
        self.h, self.w, self.color = self.ndarray.shape
        self.key = key

    def conjugate(self, P):
        return P ^ Wc

    def pbc_to_cgc(self):
        b = self.ndarray
        g = b >> 7
        for i in range(7, 0, -1):
            g <<= 1
            g |= ((b >> i) & 1) ^ ((b >> (i - 1)) & 1)
        self.ndarray = g

    def cgc_to_pbc(self):
        g = self.ndarray
        b = g >> 7
        for i in range(7, 0, -1):
            b_before = b.copy()
            b <<= 1
            b |= (b_before & 1) ^ ((g >> (i - 1)) & 1)
        self.ndarray = b

    def complexity(self, matrix):
        maxim = ((matrix.shape[0] - 1) * matrix.shape[1]) + ((matrix.shape[1] - 1) * matrix.shape[0])
        curr = 0.0
        first = matrix[0,0]
        for i in range(matrix.shape[0]):
            for j in range(matrix.shape[1]):
                if (first != matrix[i,j]):
                    curr = curr + 1
                    first = matrix[i,j]
        first = matrix[0,0]
        for i in range(matrix.shape[1]):
            for j in range(matrix.shape[0]):
                if (first != matrix[j,i]):
                    curr = curr + 1
                    first = matrix[j,i]
        return curr/maxim

    def count_seed(self):
        return sum([ord(i) for i in self.key])

    def get_ndarray_pos(self, idx):
        color = idx % self.color
        w = (idx // self.color) % self.w
        h = idx // (self.color * self.w)
        return h, w, color

    def extract_alpha(self):
        index = 0
        mod_index = 8
        temp = ""
        alpha_str = ""

        for i in range(1, 8):
            for j in range(8):
                extracted = self.ndarray[i][j][0] & 1
                if index % mod_index != (mod_index - 1):
                    temp += str(extracted)
                else:
                    temp += str(extracted)
                    alpha_str += chr(int(temp, 2))
                    temp = ""
                index += 1
        print(alpha_str)
        return float(alpha_str)

    def extract_messages(self):
        self.seed = self.count_seed()

        extracted = [self.ndarray[self.get_ndarray_pos(i)] & 1 for i in range(self.h * self.w * self.color)]
        encrypted = extracted[0]
        random_pixels = extracted[1]
        method = 'bpcs' if extracted[2] else 'lsb'

        index = 0
        mod_index = 8

        message = ""
        temp = ""

        if method == 'lsb':
            pixel_list = list(range(len(extracted)))
            if random_pixels:
                random.seed(self.seed)
                random.shuffle(pixel_list)

            for i in pixel_list:
                if i >= 3:
                    if index % mod_index != (mod_index - 1):
                        temp += str(extracted[i])
                    else:
                        temp += str(extracted[i])
                        message += chr(int(temp, 2))
                        temp = ""
                    index += 1
        elif method == 'bpcs':
            alpha = self.extract_alpha()
            block_list = []
            for h in range(0, self.h - (self.h % 8), 8):
                for w in range(0, self.w - (self.w % 8), 8):
                    for color in range(0, self.color):
                        block_list += [(h, w, color)]
            if random_pixels:
                random.seed(self.seed)
                random.shuffle(block_list)
            self.pbc_to_cgc()
            for bitplane in range(7, -1, -1):
                for h, w, color in block_list:
                    if h != 0 and w != 0 and color != 0:
                        matrix = self.ndarray[h:h+8, w:w+8, color] >> (7 - bitplane) & 1
                        if self.complexity(matrix) > alpha:
                            if matrix[0, 0]:
                                matrix = self.conjugate(matrix)
                            for i in range(8):
                                for j in range(8):
                                    if i > 0 or j > 0:
                                        extracted_bit = matrix[i][j]
                                        if index % mod_index != (mod_index - 1):
                                            temp += str(extracted_bit)
                                        else:
                                            temp += str(extracted_bit)
                                            message += chr(int(temp, 2))
                                            temp = ""
                                        index += 1

        if encrypted:
            self.string_message = decrypt_vigenere(message, self.key)
        else:
            self.string_message = message

    def parse_message(self):
        message_info = self.string_message.split("#")
#         print("PARSE MESSAGE", message_info)
        self.len_message = int(message_info[0])
        self.extension = message_info[1]
        
    def extract_secret_message_to_text(self):
#         self.extract_messages()
#         self.parse_message()
        
        init = len(str(self.len_message)) + len(str(self.extension)) + 2
        decoded = self.string_message[init : init + self.len_message]
        bytes_file = decoded.encode('utf-8')
#         print(bytes_file.encode('utf-8'))
        bytes_file = base64.b64decode(bytes_file)
        print("Bytes File\n",bytes_file)
        return bytes_file
#         print(decoded)

    def write_secret_message(self):
        init = len(str(self.len_message)) + len(str(self.extension)) + 2
        decoded = self.string_message[init : init + self.len_message]

        bytes_file = decoded.encode('utf-8')
#         print(bytes_file.encode('utf-8'))
        bytes_file = base64.b64decode(bytes_file)
        
        print("BYTE FILE NIH\n", bytes_file)
        print("Selesai")
        return bytes_file



class File:
    def __init__(self, filename):
        self.filename = filename
#         self.file_size = os.path.getsize(file_path)

    def read_ndarray_image_file(self):
        return cv2.imread(self.filename)

    def write_image_file(self, image):
        cv2.imwrite(self.filename, image)

    def read_frame_audio_file(self):
        song = wave.open(self.filename, mode='rb')
        frame_bytes = bytearray(list(song.readframes(song.getnframes())))
        song.close()

        return frame_bytes

    def init_buff_audio_file(self):
        song = wave.open(self.filename, mode='rb')
        init_buff = song.readframes(-1)
        init_buff = [item + 0 for item in init_buff]
        song.close()

        return init_buff

    def write_audio_file(self, frame, params):
        song = wave.open(self.filename, mode='wb')
        song.setparams(params)
        song.writeframes(frame)
        song.close()

    def get_params_audio(self):
        song = wave.open(self.filename, mode='rb')
        params = song.getparams()
        song.close()

        return params

    def get_extention(self):
        has_extention = self.filename.split("/")[-1].find(".") != -1
        self.extension = ""

        if (has_extention):
            self.extension = self.filename.split(".")[-1]

        return self.extension

    def read_files(self):
        with open(self.filename, "rb") as f:
            byte_file = f.read()

        return byte_file

    def write_files(self, bytes_file):
        with open(self.filename, 'wb') as fd:
            print(bytes_file)
            fd.write(bytes_file)

            
def encrypt_vigenere(plainText, key):
    output = ""
    key = key.upper()

    for i in range(0, len(plainText)):
        output += chr((ord(plainText[i]) + ord(key[i % len(key)])) % 256)

    return output


def decrypt_vigenere(cipherText, key):
    output = ""
    key = key.upper()

    for i in range(0, len(cipherText)):
        output += chr((ord(cipherText[i]) - ord(key[i % len(key)])) % 256)

    return output


## Video to Frames Extractor

In [None]:
# Opens the Video file
cap= cv2.VideoCapture('sample/video/alienoid-trailer.mp4')
fps = cap.get(cv2.CAP_PROP_FPS)
width = cap.get(cv2.CAP_PROP_FRAME_WIDTH )
height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT )
print("FPS:", fps)
print("HEIGHT:", height)
print("WIDTH:", width)
i=0
total_frames = 0
while(cap.isOpened()):
    
    ret, frame = cap.read()
    if ret == False:
        total_frames = i
        break
    cv2.imwrite('frames/alienoid-trailer' + str(i) +'.png',frame)
    print(f"extract frame ke-{i}")
    i+=1

cap.release()
cv2.destroyAllWindows()

In [None]:
dir_pesan = "pesan.txt"
dir_gambar = "alienoid.png"

In [None]:
total_frames

In [None]:
# numpy image rgb
instr = Inserter(dir_gambar, dir_pesan, "bebas")
np_image = instr.insert_message(encrypted=True, randomize=True, alpha=0.3)

out_filename = "alienoid_stego_image_alpha0.3.png"
outfile = File(out_filename)
outfile.write_image_file(np_image)

## Decrypt

In [None]:
dir_gambar_stego_image = "alienoid_stego_image.png"
key = "bebas"
extrctr = Extractor(dir_gambar_stego_image, key)
extrctr.extract_secret_message_to_text()
# byte = extrctr.write_secret_message()
# output = 'pesan_extracted.txt'
# output_file = File(output)
# output_file.write_files(byte)

## Read Text File

In [None]:
f = open("pesan.txt", 'r')
characters = f.read()
f.close()
characters

In [None]:
def split_all_characters(characters):
    return list(characters)

In [None]:
dir_pesan = "pesan.txt"
def generate_text_parts(dir_pesan):
    pesan = open(dir_pesan, 'r')
    characters = pesan.read()
    pesan.close()
    batch_message = split_string(characters, total_frames)
    for i in range(total_frames):
        new_text_file = open(f"message_parts/pesan{i}.txt", 'w')
        new_text_file.write(batch_message[i])
        new_text_file.close()
        print(f"Writing files ...{i}")
        

def batch_inserter():
    for i in range(total_frames):
        dir_gambar = f"frames/alienoid-trailer{i}.png"
        instr = Inserter(dir_gambar,f"message_parts/pesan{i}.txt", "bebas")
        np_image = instr.insert_message(encrypted=True, randomize=True, alpha=0.3)

        out_filename = f"frames_stego/alienoid_stego_image{i}.png"
        outfile = File(out_filename)
        outfile.write_image_file(np_image)
        print(f"inserting message into frame-{i}")

In [None]:
generate_text_parts(dir_pesan)

In [None]:
batch_inserter()

## Frames to Video

In [None]:
 # choose codec according to format needed

fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
video = cv2.VideoWriter('alienoid-trailer-stego-vid.mp4', fourcc, frameSize = (int(width), int(height)), fps=int(fps))

for j in range(total_frames):
    path = 'frames_stego/alienoid_stego_image' + str(j) + '.png'
    img = cv2.imread(path)
    video.write(img)
    print(f"Menjahit frame jadi video {j}")

cv2.destroyAllWindows()
video.release()

## Stego Frames to Plaintext

In [None]:
result = ""
for i in range(total_frames):
    print(f"Plaintext ke-{i}")
    dir_gambar_stego_image = f"alienoid_stego_image{i}.png"
    key = "bebas"
    extrctr = Extractor(dir_gambar_stego_image, key)
    extrctr.extract_messages()
    extrctr.parse_message()
    result += extrctr.extract_secret_message_to_text()
#     byte = extrctr.write_secret_message()
#     output = 'pesan_extracted.txt'
#     output_file = File(output)
#     output_file.write_files(byte)

In [None]:
total_frames

In [None]:
dir_gambar_stego_image = f"alienoid_stego_image.png"
key = "bebas"
extrctr = Extractor(dir_gambar_stego_image, key)
extrctr.extract_messages()
extrctr.parse_message()
# res = extrctr.extract_secrete_mesage()
# print(res)

In [None]:
extrctr.extract_secret_message_to_text()

In [None]:
import os
sz = os.path.getsize("alienoid.png")

In [None]:
list("helloworld\n\t")

In [None]:
message = ""
for i in range(193):
    dir_gambar_stego_image = f"frames_stego/alienoid_stego_image{i}.png"
    key = "bebas"
    extrctr = Extractor(dir_gambar_stego_image, key)
#     extrctr.extract_secret_message_to_text()
    extrctr.extract_messages()
    extrctr.parse_message()
    print(f"PROGRESS: {i/193}%" )
    message += str(extrctr.extract_secret_message_to_text())[2:]
    print(message)

In [None]:
a = "makanan nih cuy"
a = bytes(a, 'utf-8')
base64.b64encode(a).decode('utf-8')

In [2]:
inserter = Inserter("alienoid.png", "Ini adalah pesan rahasia pakai string, bukan pakai file", "bebas")

In [3]:
np_image = inserter.insert_message(encrypted=True, randomize=True, alpha=0.3)

out_filename = "alienoid_stego_image_dari_string.png"
outfile = File(out_filename)
outfile.write_image_file(np_image)

In [4]:
extractor = Extractor("alienoid_stego_image_dari_string.png", "bebas")

In [5]:
extractor.extract_messages()
extractor.parse_message()
extractor.extract_secret_message_to_text()

0.30000
Bytes File
 b'Ini adalah pesan rahasia pakai string, bukan pakai file'


b'Ini adalah pesan rahasia pakai string, bukan pakai file'

In [7]:
a = extractor.extract_secret_message_to_text()

Bytes File
 b'Ini adalah pesan rahasia pakai string, bukan pakai file'


In [8]:
str(a)

"b'Ini adalah pesan rahasia pakai string, bukan pakai file'"