# IPC

## Advanced FILE I/O

![io01](resources/io01.png)

----------------

## Pipe

![pipe01](resources/pipe01.png)
![pipe02](resources/pipe02.png)

1.  pipe就是一个文件，在内核中开辟了一段buffer空间用于两个进程间传递数据。这个特殊的文件通过一个信号量进行读写控制

-------------------------

### linux2.6/linux/include/pipe_fs_i.h

```c
struct pipe_inode_info {
	wait_queue_head_t wait;
	unsigned int nrbufs, curbuf;
	struct pipe_buffer bufs[PIPE_BUFFERS];
	struct page *tmp_page;
	unsigned int start;
	unsigned int readers;
	unsigned int writers;
	unsigned int waiting_writers;
	unsigned int r_counter;
	unsigned int w_counter;
	struct fasync_struct *fasync_readers;
	struct fasync_struct *fasync_writers;
};
```

-------------------------

### linux2.6/fs/pipe.c

```c
static ssize_t
pipe_writev(struct file *filp, const struct iovec *_iov,
	    unsigned long nr_segs, loff_t *ppos)
{
	struct inode *inode = filp->f_dentry->d_inode;
	struct pipe_inode_info *info;
	ssize_t ret;
	int do_wakeup;
	struct iovec *iov = (struct iovec *)_iov;
	size_t total_len;

	total_len = iov_length(iov, nr_segs);
	/* Null write succeeds. */
	if (unlikely(total_len == 0))
		return 0;

	do_wakeup = 0;
	ret = 0;
	down(PIPE_SEM(*inode));
	info = inode->i_pipe;

	if (!PIPE_READERS(*inode)) {
		send_sig(SIGPIPE, current, 0);
		ret = -EPIPE;
		goto out;
	}


    /* by xitongsys
        If the total_len < current buf capacity, just copy all of them to the
        buffer. No need to go into the loop.
        
        See details in the following comment.
    */

	/* We try to merge small writes */
	if (info->nrbufs && total_len < PAGE_SIZE) {
		int lastbuf = (info->curbuf + info->nrbufs - 1) & (PIPE_BUFFERS-1);
		struct pipe_buffer *buf = info->bufs + lastbuf;
		struct pipe_buf_operations *ops = buf->ops;
		int offset = buf->offset + buf->len;
		if (ops->can_merge && offset + total_len <= PAGE_SIZE) {
			void *addr = ops->map(filp, info, buf);
			int error = pipe_iov_copy_from_user(offset + addr, iov, total_len);
			ops->unmap(info, buf);
			ret = error;
			do_wakeup = 1;
			if (error)
				goto out;
			buf->len += total_len;
			ret = total_len;
			goto out;
		}
			
	}

	for (;;) {
		int bufs;
		if (!PIPE_READERS(*inode)) {
			send_sig(SIGPIPE, current, 0);
			if (!ret) ret = -EPIPE;
			break;
		}
		bufs = info->nrbufs;
		if (bufs < PIPE_BUFFERS) {
			ssize_t chars;
			int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
			struct pipe_buffer *buf = info->bufs + newbuf;
			struct page *page = info->tmp_page;
			int error;

            /* by xitongsys
               Every page for the buffer is just written once even it is not full.
               So we need alloc_page every loop.

               The reason is the buffer has a field `len`. The reader read the field
               and read the data. So we can't change the `len` in different writers.
            */

			if (!page) {
				page = alloc_page(GFP_HIGHUSER);
				if (unlikely(!page)) {
					ret = ret ? : -ENOMEM;
					break;
				}
				info->tmp_page = page;
			}
			/* Always wakeup, even if the copy fails. Otherwise
			 * we lock up (O_NONBLOCK-)readers that sleep due to
			 * syscall merging.
			 * FIXME! Is this really true?
			 */
			do_wakeup = 1;
			chars = PAGE_SIZE;
			if (chars > total_len)
				chars = total_len;

			error = pipe_iov_copy_from_user(kmap(page), iov, chars);
			kunmap(page);
			if (unlikely(error)) {
				if (!ret) ret = -EFAULT;
				break;
			}
			ret += chars;

			/* Insert it into the buffer array */
			buf->page = page;
			buf->ops = &anon_pipe_buf_ops;
			buf->offset = 0;
			buf->len = chars;
			info->nrbufs = ++bufs;

            /* by xitongsys
                alloc a new page for every loop
            */
			info->tmp_page = NULL;

			total_len -= chars;
			if (!total_len)
				break;
		}
		if (bufs < PIPE_BUFFERS)
			continue;
		if (filp->f_flags & O_NONBLOCK) {
			if (!ret) ret = -EAGAIN;
			break;
		}
		if (signal_pending(current)) {
			if (!ret) ret = -ERESTARTSYS;
			break;
		}

        /* by xitongsys
            1. Firstly wake up the sync waiters and then the async watiers
            2. see the comments and codes bellow
        */
		if (do_wakeup) {
			wake_up_interruptible_sync(PIPE_WAIT(*inode));
			kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
			do_wakeup = 0;
		}
		PIPE_WAITING_WRITERS(*inode)++;
		pipe_wait(inode);
		PIPE_WAITING_WRITERS(*inode)--;
	}
out:
	up(PIPE_SEM(*inode));
	if (do_wakeup) {
		wake_up_interruptible(PIPE_WAIT(*inode));
		kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
	}
	if (ret > 0)
		inode_update_time(inode, 1);	/* mtime and ctime */
	return ret;
}

static ssize_t
pipe_write(struct file *filp, const char __user *buf,
	   size_t count, loff_t *ppos)
{
	struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
	return pipe_writev(filp, &iov, 1, ppos);
}

```
1. see comments in the codes


--------------------------

### linux2.6/kernel/sched.c

```c
/*
 * The core wakeup function.  Non-exclusive wakeups (nr_exclusive == 0) just
 * wake everything up.  If it's an exclusive wakeup (nr_exclusive == small +ve
 * number) then we wake all the non-exclusive tasks and one exclusive task.
 *
 * There are circumstances in which we can try to wake a task which has already
 * started to run but is not in state TASK_RUNNING.  try_to_wake_up() returns
 * zero in this (rare) case, and we handle it by continuing to scan the queue.
 */
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
			     int nr_exclusive, int sync, void *key)
{
	struct list_head *tmp, *next;

	list_for_each_safe(tmp, next, &q->task_list) {
		wait_queue_t *curr;
		unsigned flags;
		curr = list_entry(tmp, wait_queue_t, task_list);
		flags = curr->flags;
		if (curr->func(curr, mode, sync, key) &&
		    (flags & WQ_FLAG_EXCLUSIVE) &&
		    !--nr_exclusive)
			break;
	}
}
```

1. used for wake up sync waiters. It just call the callback func `curr->func`

--------------------------

### linux2.6/fs/fcntl.c

```c

void __kill_fasync(struct fasync_struct *fa, int sig, int band)
{
	while (fa) {
		struct fown_struct * fown;
		if (fa->magic != FASYNC_MAGIC) {
			printk(KERN_ERR "kill_fasync: bad magic number in "
			       "fasync_struct!\n");
			return;
		}
		fown = &fa->fa_file->f_owner;
		/* Don't send SIGURG to processes which have not set a
		   queued signum: SIGURG has its own default signalling
		   mechanism. */
		if (!(sig == SIGURG && fown->signum == 0))
			send_sigio(fown, fa->fa_fd, band);
		fa = fa->fa_next;
	}
}

EXPORT_SYMBOL(__kill_fasync);

void kill_fasync(struct fasync_struct **fp, int sig, int band)
{
	/* First a quick test without locking: usually
	 * the list is empty.
	 */
	if (*fp) {
		read_lock(&fasync_lock);
		/* reread *fp after obtaining the lock */
		__kill_fasync(*fp, sig, band);
		read_unlock(&fasync_lock);
	}
}
EXPORT_SYMBOL(kill_fasync);

```

1. used to send signal to the async waiters in the queue

```c
kill_fasync(PIPE_FASYNC_READERS(*inode), SIGIO, POLL_IN);
```

------------------------






## FIFO

![pipe03](resources/pipe03.png)

-------------

## IPC

![ipc01](resources/ipc01.png)
![ipc02](resources/ipc02.png)
![ipc03](resources/ipc03.png)




## IPC Semaphores

![sema01](resources/sema01.png)
![sema02](resources/sema02.png)
![sema03](resources/sema03.png)
![sema04](resources/sema04.png)

## IPC shared memory

![ipcshm01](resources/ipcshm01.png)
![ipcshm02](resources/ipcshm02.png)
![ipcshm03](resources/ipcshm03.png)
![ipcshm04](resources/ipcshm04.png)